You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by at...@apache.org on 2014/02/26 21:28:42 UTC
svn commit: r1572235 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/fs/s3native/ src/main/resources/
src/test/java/org/apache/hadoop/fs/s3native/ src/test/resources/
Author: atm
Date: Wed Feb 26 20:28:41 2014
New Revision: 1572235
URL: http://svn.apache.org/r1572235
Log:
HADOOP-9454. Support multipart uploads for s3native. Contributed by Jordan Mendelson and Akira AJISAKA.
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/resources/jets3t.properties
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1572235&r1=1572234&r2=1572235&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Wed Feb 26 20:28:41 2014
@@ -343,6 +343,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10348. Deprecate hadoop.ssl.configuration in branch-2, and remove
it in trunk. (Haohui Mai via jing9)
+ HADOOP-9454. Support multipart uploads for s3native. (Jordan Mendelson and
+ Akira AJISAKA via atm)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java?rev=1572235&r1=1572234&r2=1572235&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java Wed Feb 26 20:28:41 2014
@@ -28,6 +28,9 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -41,10 +44,13 @@ import org.jets3t.service.S3ServiceExcep
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageObjectsChunk;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
+import org.jets3t.service.model.MultipartPart;
+import org.jets3t.service.model.MultipartUpload;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import org.jets3t.service.security.AWSCredentials;
+import org.jets3t.service.utils.MultipartUtils;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -52,6 +58,12 @@ class Jets3tNativeFileSystemStore implem
private S3Service s3Service;
private S3Bucket bucket;
+
+ private long multipartBlockSize;
+ private boolean multipartEnabled;
+ private long multipartCopyBlockSize;
+ static final long MAX_PART_SIZE = (long)5 * 1024 * 1024 * 1024;
+
public static final Log LOG =
LogFactory.getLog(Jets3tNativeFileSystemStore.class);
@@ -67,13 +79,27 @@ class Jets3tNativeFileSystemStore implem
} catch (S3ServiceException e) {
handleS3ServiceException(e);
}
+ multipartEnabled =
+ conf.getBoolean("fs.s3n.multipart.uploads.enabled", false);
+ multipartBlockSize = Math.min(
+ conf.getLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024),
+ MAX_PART_SIZE);
+ multipartCopyBlockSize = Math.min(
+ conf.getLong("fs.s3n.multipart.copy.block.size", MAX_PART_SIZE),
+ MAX_PART_SIZE);
+
bucket = new S3Bucket(uri.getHost());
}
@Override
public void storeFile(String key, File file, byte[] md5Hash)
throws IOException {
-
+
+ if (multipartEnabled && file.length() >= multipartBlockSize) {
+ storeLargeFile(key, file, md5Hash);
+ return;
+ }
+
BufferedInputStream in = null;
try {
in = new BufferedInputStream(new FileInputStream(file));
@@ -98,6 +124,31 @@ class Jets3tNativeFileSystemStore implem
}
}
+ public void storeLargeFile(String key, File file, byte[] md5Hash)
+ throws IOException {
+ S3Object object = new S3Object(key);
+ object.setDataInputFile(file);
+ object.setContentType("binary/octet-stream");
+ object.setContentLength(file.length());
+ if (md5Hash != null) {
+ object.setMd5Hash(md5Hash);
+ }
+
+ List<StorageObject> objectsToUploadAsMultipart =
+ new ArrayList<StorageObject>();
+ objectsToUploadAsMultipart.add(object);
+ MultipartUtils mpUtils = new MultipartUtils(multipartBlockSize);
+
+ try {
+ mpUtils.uploadObjects(bucket.getName(), s3Service,
+ objectsToUploadAsMultipart, null);
+ } catch (ServiceException e) {
+ handleServiceException(e);
+ } catch (Exception e) {
+ throw new S3Exception(e);
+ }
+ }
+
@Override
public void storeEmptyFile(String key) throws IOException {
try {
@@ -152,11 +203,8 @@ class Jets3tNativeFileSystemStore implem
}
S3Object object = s3Service.getObject(bucket.getName(), key);
return object.getDataInputStream();
- } catch (S3ServiceException e) {
- handleS3ServiceException(key, e);
- return null; //never returned - keep compiler happy
} catch (ServiceException e) {
- handleServiceException(e);
+ handleServiceException(key, e);
return null; //return null if key not found
}
}
@@ -180,11 +228,8 @@ class Jets3tNativeFileSystemStore implem
S3Object object = s3Service.getObject(bucket, key, null, null, null,
null, byteRangeStart, null);
return object.getDataInputStream();
- } catch (S3ServiceException e) {
- handleS3ServiceException(key, e);
- return null; //never returned - keep compiler happy
} catch (ServiceException e) {
- handleServiceException(e);
+ handleServiceException(key, e);
return null; //return null if key not found
}
}
@@ -244,8 +289,16 @@ class Jets3tNativeFileSystemStore implem
LOG.debug("Deleting key:" + key + "from bucket" + bucket.getName());
}
s3Service.deleteObject(bucket, key);
- } catch (S3ServiceException e) {
- handleS3ServiceException(key, e);
+ } catch (ServiceException e) {
+ handleServiceException(key, e);
+ }
+ }
+
+ public void rename(String srcKey, String dstKey) throws IOException {
+ try {
+ s3Service.renameObject(bucket.getName(), srcKey, new S3Object(dstKey));
+ } catch (ServiceException e) {
+ handleServiceException(e);
}
}
@@ -255,10 +308,52 @@ class Jets3tNativeFileSystemStore implem
if(LOG.isDebugEnabled()) {
LOG.debug("Copying srcKey: " + srcKey + "to dstKey: " + dstKey + "in bucket: " + bucket.getName());
}
+ if (multipartEnabled) {
+ S3Object object = s3Service.getObjectDetails(bucket, srcKey, null,
+ null, null, null);
+ if (multipartCopyBlockSize > 0 &&
+ object.getContentLength() > multipartCopyBlockSize) {
+ copyLargeFile(object, dstKey);
+ return;
+ }
+ }
s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(),
new S3Object(dstKey), false);
- } catch (S3ServiceException e) {
- handleS3ServiceException(srcKey, e);
+ } catch (ServiceException e) {
+ handleServiceException(srcKey, e);
+ }
+ }
+
+ public void copyLargeFile(S3Object srcObject, String dstKey) throws IOException {
+ try {
+ long partCount = srcObject.getContentLength() / multipartCopyBlockSize +
+ (srcObject.getContentLength() % multipartCopyBlockSize > 0 ? 1 : 0);
+
+ MultipartUpload multipartUpload = s3Service.multipartStartUpload
+ (bucket.getName(), dstKey, srcObject.getMetadataMap());
+
+ List<MultipartPart> listedParts = new ArrayList<MultipartPart>();
+ for (int i = 0; i < partCount; i++) {
+ long byteRangeStart = i * multipartCopyBlockSize;
+ long byteLength;
+ if (i < partCount - 1) {
+ byteLength = multipartCopyBlockSize;
+ } else {
+ byteLength = srcObject.getContentLength() % multipartCopyBlockSize;
+ if (byteLength == 0) {
+ byteLength = multipartCopyBlockSize;
+ }
+ }
+
+ MultipartPart copiedPart = s3Service.multipartUploadPartCopy
+ (multipartUpload, i + 1, bucket.getName(), srcObject.getKey(),
+ null, null, null, null, byteRangeStart,
+ byteRangeStart + byteLength - 1, null);
+ listedParts.add(copiedPart);
+ }
+
+ Collections.reverse(listedParts);
+ s3Service.multipartCompleteUpload(multipartUpload, listedParts);
} catch (ServiceException e) {
handleServiceException(e);
}
@@ -291,11 +386,11 @@ class Jets3tNativeFileSystemStore implem
System.out.println(sb);
}
- private void handleS3ServiceException(String key, S3ServiceException e) throws IOException {
- if ("NoSuchKey".equals(e.getS3ErrorCode())) {
+ private void handleServiceException(String key, ServiceException e) throws IOException {
+ if ("NoSuchKey".equals(e.getErrorCode())) {
throw new FileNotFoundException("Key '" + key + "' does not exist in S3");
} else {
- handleS3ServiceException(e);
+ handleServiceException(e);
}
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml?rev=1572235&r1=1572234&r2=1572235&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml Wed Feb 26 20:28:41 2014
@@ -533,6 +533,31 @@
</property>
<property>
+ <name>fs.s3n.multipart.uploads.enabled</name>
+ <value>false</value>
+ <description>Setting this property to true enables multiple uploads to
+ native S3 filesystem. When uploading a file, it is split into blocks
+ if the size is larger than fs.s3n.multipart.uploads.block.size.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3n.multipart.uploads.block.size</name>
+ <value>67108864</value>
+ <description>The block size for multipart uploads to native S3 filesystem.
+ Default size is 64MB.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3n.multipart.copy.block.size</name>
+ <value>5368709120</value>
+ <description>The block size for multipart copy in native S3 filesystem.
+ Default size is 5GB.
+ </description>
+</property>
+
+<property>
<name>io.seqfile.compress.blocksize</name>
<value>1000000</value>
<description>The minimum block size for compression in block compressed
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java?rev=1572235&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java Wed Feb 26 20:28:41 2014
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3native;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import static org.junit.Assert.*;
+import static org.junit.Assume.*;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.security.DigestInputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+
+public class TestJets3tNativeFileSystemStore {
+ private Configuration conf;
+ private Jets3tNativeFileSystemStore store;
+ private NativeS3FileSystem fs;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new Configuration();
+ store = new Jets3tNativeFileSystemStore();
+ fs = new NativeS3FileSystem(store);
+ conf.setBoolean("fs.s3n.multipart.uploads.enabled", true);
+ conf.setLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024);
+ fs.initialize(URI.create(conf.get("test.fs.s3n.name")), conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ store.purge("test");
+ } catch (Exception e) {}
+ }
+
+ @BeforeClass
+ public static void checkSettings() throws Exception {
+ Configuration conf = new Configuration();
+ assumeNotNull(conf.get("fs.s3n.awsAccessKeyId"));
+ assumeNotNull(conf.get("fs.s3n.awsSecretAccessKey"));
+ assumeNotNull(conf.get("test.fs.s3n.name"));
+ }
+
+ protected void writeRenameReadCompare(Path path, long len)
+ throws IOException, NoSuchAlgorithmException {
+ // If len > fs.s3n.multipart.uploads.block.size,
+ // we'll use a multipart upload copy
+ MessageDigest digest = MessageDigest.getInstance("MD5");
+ OutputStream out = new BufferedOutputStream(
+ new DigestOutputStream(fs.create(path, false), digest));
+ for (long i = 0; i < len; i++) {
+ out.write('Q');
+ }
+ out.flush();
+ out.close();
+
+ assertTrue("Exists", fs.exists(path));
+
+ // Depending on if this file is over 5 GB or not,
+ // rename will cause a multipart upload copy
+ Path copyPath = path.suffix(".copy");
+ fs.rename(path, copyPath);
+
+ assertTrue("Copy exists", fs.exists(copyPath));
+
+ // Download file from S3 and compare the digest against the original
+ MessageDigest digest2 = MessageDigest.getInstance("MD5");
+ InputStream in = new BufferedInputStream(
+ new DigestInputStream(fs.open(copyPath), digest2));
+ long copyLen = 0;
+ while (in.read() != -1) {copyLen++;}
+ in.close();
+
+ assertEquals("Copy length matches original", len, copyLen);
+ assertArrayEquals("Digests match", digest.digest(), digest2.digest());
+ }
+
+ @Test
+ public void testSmallUpload() throws IOException, NoSuchAlgorithmException {
+ // Regular upload, regular copy
+ writeRenameReadCompare(new Path("/test/small"), 16384);
+ }
+
+ @Test
+ public void testMediumUpload() throws IOException, NoSuchAlgorithmException {
+ // Multipart upload, regular copy
+ writeRenameReadCompare(new Path("/test/medium"), 33554432); // 100 MB
+ }
+
+ @Test
+ public void testExtraLargeUpload()
+ throws IOException, NoSuchAlgorithmException {
+ // Multipart upload, multipart copy
+ writeRenameReadCompare(new Path("/test/xlarge"), 5368709121L); // 5GB+1byte
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/resources/jets3t.properties
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/resources/jets3t.properties?rev=1572235&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/resources/jets3t.properties (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/resources/jets3t.properties Wed Feb 26 20:28:41 2014
@@ -0,0 +1,16 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Speed up the s3native jets3t test
+
+s3service.max-thread-count=10
+threaded-service.max-thread-count=10