You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sa...@apache.org on 2018/08/02 02:15:14 UTC
hadoop git commit: HADOOP-15607. AliyunOSS: fix duplicated partNumber
issue in AliyunOSSBlockOutputStream. Contributed by Jinhu Wu.
Repository: hadoop
Updated Branches:
refs/heads/branch-2 21e416ad2 -> 418e957c6
HADOOP-15607. AliyunOSS: fix duplicated partNumber issue in AliyunOSSBlockOutputStream. Contributed by Jinhu Wu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/418e957c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/418e957c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/418e957c
Branch: refs/heads/branch-2
Commit: 418e957c64cc31f13ea07c1b9d47208dcb4b4101
Parents: 21e416a
Author: Sammi Chen <sa...@intel.com>
Authored: Thu Aug 2 10:13:22 2018 +0800
Committer: Sammi Chen <sa...@intel.com>
Committed: Thu Aug 2 10:14:54 2018 +0800
----------------------------------------------------------------------
.../aliyun/oss/AliyunOSSBlockOutputStream.java | 59 ++++++++++++--------
.../fs/aliyun/oss/AliyunOSSFileSystemStore.java | 2 +
.../oss/TestAliyunOSSBlockOutputStream.java | 12 +++-
3 files changed, 49 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/418e957c/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
index 2d9a13b..42cb0b1 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
@@ -33,7 +33,9 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -51,7 +53,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
private boolean closed;
private String key;
private File blockFile;
- private List<File> blockFiles = new ArrayList<>();
+ private Map<Integer, File> blockFiles = new HashMap<>();
private long blockSize;
private int blockId = 0;
private long blockWritten = 0L;
@@ -95,8 +97,9 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
blockStream.flush();
blockStream.close();
- if (!blockFiles.contains(blockFile)) {
- blockFiles.add(blockFile);
+ if (!blockFiles.values().contains(blockFile)) {
+ blockId++;
+ blockFiles.put(blockId, blockFile);
}
try {
@@ -110,7 +113,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
@Override
public PartETag call() throws Exception {
PartETag partETag = store.uploadPart(blockFile, key, uploadId,
- blockId + 1);
+ blockId);
return partETag;
}
});
@@ -124,11 +127,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
store.completeMultipartUpload(key, uploadId, partETags);
}
} finally {
- for (File tFile: blockFiles) {
- if (tFile.exists() && !tFile.delete()) {
- LOG.warn("Failed to delete temporary file {}", tFile);
- }
- }
+ removePartFiles();
closed = true;
}
}
@@ -145,41 +144,55 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
if (closed) {
throw new IOException("Stream closed.");
}
- try {
- blockStream.write(b, off, len);
- blockWritten += len;
- if (blockWritten >= blockSize) {
- uploadCurrentPart();
- blockWritten = 0L;
+ blockStream.write(b, off, len);
+ blockWritten += len;
+ if (blockWritten >= blockSize) {
+ uploadCurrentPart();
+ blockWritten = 0L;
+ }
+ }
+
+ private void removePartFiles() throws IOException {
+ for (ListenableFuture<PartETag> partETagFuture : partETagsFutures) {
+ if (!partETagFuture.isDone()) {
+ continue;
}
- } finally {
- for (File tFile: blockFiles) {
- if (tFile.exists() && !tFile.delete()) {
- LOG.warn("Failed to delete temporary file {}", tFile);
+
+ try {
+ File blockFile = blockFiles.get(partETagFuture.get().getPartNumber());
+ if (blockFile != null && blockFile.exists() && !blockFile.delete()) {
+ LOG.warn("Failed to delete temporary file {}", blockFile);
}
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
}
}
}
private void uploadCurrentPart() throws IOException {
- blockFiles.add(blockFile);
blockStream.flush();
blockStream.close();
if (blockId == 0) {
uploadId = store.getUploadId(key);
}
+
+ blockId++;
+ blockFiles.put(blockId, blockFile);
+
+ final File currentFile = blockFile;
+ final int currentBlockId = blockId;
ListenableFuture<PartETag> partETagFuture =
executorService.submit(new Callable<PartETag>() {
@Override
public PartETag call() throws Exception {
- PartETag partETag = store.uploadPart(blockFile, key, uploadId,
- blockId + 1);
+ PartETag partETag = store.uploadPart(currentFile, key, uploadId,
+ currentBlockId);
return partETag;
}
});
partETagsFutures.add(partETagFuture);
+ removePartFiles();
blockFile = newBlockFile();
- blockId++;
blockStream = new BufferedOutputStream(new FileOutputStream(blockFile));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/418e957c/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
index 4036215..0f99cd6 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
@@ -450,6 +450,8 @@ public class AliyunOSSFileSystemStore {
request.setRange(byteStart, byteEnd);
return ossClient.getObject(request).getObjectContent();
} catch (OSSException | ClientException e) {
+ LOG.error("Exception thrown when store retrieves key: "
+ + key + ", exception: " + e);
return null;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/418e957c/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
index 365d931..6fe6f03 100644
--- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
@@ -31,6 +31,7 @@ import org.junit.rules.Timeout;
import java.io.IOException;
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
/**
* Tests regular and multi-part upload functionality for
@@ -48,7 +49,10 @@ public class TestAliyunOSSBlockOutputStream {
public void setUp() throws Exception {
Configuration conf = new Configuration();
conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024);
- conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 5 * 1024 * 1024);
+ conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
+ conf.setInt(IO_CHUNK_BUFFER_SIZE,
+ conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0));
+ conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20);
fs = AliyunOSSTestUtils.createTestFileSystem(conf);
}
@@ -85,6 +89,12 @@ public class TestAliyunOSSBlockOutputStream {
}
@Test
+ public void testMultiPartUploadConcurrent() throws IOException {
+ ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
+ 50 * 1024 * 1024 - 1);
+ }
+
+ @Test
public void testHugeUpload() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
MULTIPART_UPLOAD_PART_SIZE_DEFAULT - 1);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org