You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sa...@apache.org on 2018/08/02 02:15:14 UTC

hadoop git commit: HADOOP-15607. AliyunOSS: fix duplicated partNumber issue in AliyunOSSBlockOutputStream. Contributed by Jinhu Wu.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 21e416ad2 -> 418e957c6


HADOOP-15607. AliyunOSS: fix duplicated partNumber issue in AliyunOSSBlockOutputStream. Contributed by Jinhu Wu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/418e957c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/418e957c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/418e957c

Branch: refs/heads/branch-2
Commit: 418e957c64cc31f13ea07c1b9d47208dcb4b4101
Parents: 21e416a
Author: Sammi Chen <sa...@intel.com>
Authored: Thu Aug 2 10:13:22 2018 +0800
Committer: Sammi Chen <sa...@intel.com>
Committed: Thu Aug 2 10:14:54 2018 +0800

----------------------------------------------------------------------
 .../aliyun/oss/AliyunOSSBlockOutputStream.java  | 59 ++++++++++++--------
 .../fs/aliyun/oss/AliyunOSSFileSystemStore.java |  2 +
 .../oss/TestAliyunOSSBlockOutputStream.java     | 12 +++-
 3 files changed, 49 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/418e957c/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
index 2d9a13b..42cb0b1 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
@@ -33,7 +33,9 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -51,7 +53,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
   private boolean closed;
   private String key;
   private File blockFile;
-  private List<File> blockFiles = new ArrayList<>();
+  private Map<Integer, File> blockFiles = new HashMap<>();
   private long blockSize;
   private int blockId = 0;
   private long blockWritten = 0L;
@@ -95,8 +97,9 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
 
     blockStream.flush();
     blockStream.close();
-    if (!blockFiles.contains(blockFile)) {
-      blockFiles.add(blockFile);
+    if (!blockFiles.values().contains(blockFile)) {
+      blockId++;
+      blockFiles.put(blockId, blockFile);
     }
 
     try {
@@ -110,7 +113,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
                 @Override
                 public PartETag call() throws Exception {
                   PartETag partETag = store.uploadPart(blockFile, key, uploadId,
-                      blockId + 1);
+                      blockId);
                   return partETag;
                 }
               });
@@ -124,11 +127,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
         store.completeMultipartUpload(key, uploadId, partETags);
       }
     } finally {
-      for (File tFile: blockFiles) {
-        if (tFile.exists() && !tFile.delete()) {
-          LOG.warn("Failed to delete temporary file {}", tFile);
-        }
-      }
+      removePartFiles();
       closed = true;
     }
   }
@@ -145,41 +144,55 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
     if (closed) {
       throw new IOException("Stream closed.");
     }
-    try {
-      blockStream.write(b, off, len);
-      blockWritten += len;
-      if (blockWritten >= blockSize) {
-        uploadCurrentPart();
-        blockWritten = 0L;
+    blockStream.write(b, off, len);
+    blockWritten += len;
+    if (blockWritten >= blockSize) {
+      uploadCurrentPart();
+      blockWritten = 0L;
+    }
+  }
+
+  private void removePartFiles() throws IOException {
+    for (ListenableFuture<PartETag> partETagFuture : partETagsFutures) {
+      if (!partETagFuture.isDone()) {
+        continue;
       }
-    } finally {
-      for (File tFile: blockFiles) {
-        if (tFile.exists() && !tFile.delete()) {
-          LOG.warn("Failed to delete temporary file {}", tFile);
+
+      try {
+        File blockFile = blockFiles.get(partETagFuture.get().getPartNumber());
+        if (blockFile != null && blockFile.exists() && !blockFile.delete()) {
+          LOG.warn("Failed to delete temporary file {}", blockFile);
         }
+      } catch (InterruptedException | ExecutionException e) {
+        throw new IOException(e);
       }
     }
   }
 
   private void uploadCurrentPart() throws IOException {
-    blockFiles.add(blockFile);
     blockStream.flush();
     blockStream.close();
     if (blockId == 0) {
       uploadId = store.getUploadId(key);
     }
+
+    blockId++;
+    blockFiles.put(blockId, blockFile);
+
+    final File currentFile = blockFile;
+    final int currentBlockId = blockId;
     ListenableFuture<PartETag> partETagFuture =
         executorService.submit(new Callable<PartETag>() {
           @Override
           public PartETag call() throws Exception {
-            PartETag partETag = store.uploadPart(blockFile, key, uploadId,
-                blockId + 1);
+            PartETag partETag = store.uploadPart(currentFile, key, uploadId,
+                currentBlockId);
             return partETag;
           }
         });
     partETagsFutures.add(partETagFuture);
+    removePartFiles();
     blockFile = newBlockFile();
-    blockId++;
     blockStream = new BufferedOutputStream(new FileOutputStream(blockFile));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/418e957c/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
index 4036215..0f99cd6 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
@@ -450,6 +450,8 @@ public class AliyunOSSFileSystemStore {
       request.setRange(byteStart, byteEnd);
       return ossClient.getObject(request).getObjectContent();
     } catch (OSSException | ClientException e) {
+      LOG.error("Exception thrown when store retrieves key: "
+              + key + ", exception: " + e);
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/418e957c/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
index 365d931..6fe6f03 100644
--- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
@@ -31,6 +31,7 @@ import org.junit.rules.Timeout;
 import java.io.IOException;
 
 import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
 
 /**
  * Tests regular and multi-part upload functionality for
@@ -48,7 +49,10 @@ public class TestAliyunOSSBlockOutputStream {
   public void setUp() throws Exception {
     Configuration conf = new Configuration();
     conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024);
-    conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 5 * 1024 * 1024);
+    conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
+    conf.setInt(IO_CHUNK_BUFFER_SIZE,
+        conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0));
+    conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20);
     fs = AliyunOSSTestUtils.createTestFileSystem(conf);
   }
 
@@ -85,6 +89,12 @@ public class TestAliyunOSSBlockOutputStream {
   }
 
   @Test
+  public void testMultiPartUploadConcurrent() throws IOException {
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
+        50 * 1024 * 1024 - 1);
+  }
+
+  @Test
   public void testHugeUpload() throws IOException {
     ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
         MULTIPART_UPLOAD_PART_SIZE_DEFAULT - 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org