You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/02/27 15:15:20 UTC
[24/31] hadoop git commit: HADOOP-14028. S3A BlockOutputStreams
doesn't delete temporary files in multipart uploads or handle part upload
failures. Contributed by Steve Loughran.
HADOOP-14028. S3A BlockOutputStreams doesn't delete temporary files in multipart uploads or handle part upload failures.
Contributed by Steve Loughran.
(cherry picked from commit 29fe5af017b945d8750c074ca39031b5b777eddd)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dab00da1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dab00da1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dab00da1
Branch: refs/heads/HADOOP-13345
Commit: dab00da19f25619ccc71c7f803a235b21766bf1e
Parents: 120bef7
Author: Steve Loughran <st...@apache.org>
Authored: Sat Feb 25 15:35:19 2017 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Sat Feb 25 15:35:19 2017 +0000
----------------------------------------------------------------------
.../hadoop/fs/s3a/S3ABlockOutputStream.java | 68 ++-
.../org/apache/hadoop/fs/s3a/S3ADataBlocks.java | 514 +++++++++++--------
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 78 ++-
.../hadoop/fs/s3a/S3AInstrumentation.java | 40 +-
.../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 26 +
.../hadoop/fs/s3a/ITestS3ABlockOutputArray.java | 76 ++-
.../fs/s3a/ITestS3ABlockOutputByteBuffer.java | 5 +-
.../hadoop/fs/s3a/ITestS3ABlockOutputDisk.java | 12 +
.../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 13 +
.../apache/hadoop/fs/s3a/TestDataBlocks.java | 45 +-
.../fs/s3a/scale/AbstractSTestS3AHugeFiles.java | 28 +-
11 files changed, 639 insertions(+), 266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index 89b9b29..1b0929b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.fs.s3a;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.util.Progressable;
@@ -178,10 +176,10 @@ class S3ABlockOutputStream extends OutputStream {
if (activeBlock == null) {
blockCount++;
if (blockCount>= Constants.MAX_MULTIPART_COUNT) {
- LOG.error("Number of partitions in stream exceeds limit for S3: " +
+ LOG.error("Number of partitions in stream exceeds limit for S3: "
+ Constants.MAX_MULTIPART_COUNT + " write may fail.");
}
- activeBlock = blockFactory.create(this.blockSize);
+ activeBlock = blockFactory.create(blockCount, this.blockSize, statistics);
}
return activeBlock;
}
@@ -206,7 +204,9 @@ class S3ABlockOutputStream extends OutputStream {
* Clear the active block.
*/
private void clearActiveBlock() {
- LOG.debug("Clearing active block");
+ if (activeBlock != null) {
+ LOG.debug("Clearing active block");
+ }
synchronized (this) {
activeBlock = null;
}
@@ -356,11 +356,9 @@ class S3ABlockOutputStream extends OutputStream {
writeOperationHelper.writeFailed(ioe);
throw ioe;
} finally {
- LOG.debug("Closing block and factory");
- IOUtils.closeStream(block);
- IOUtils.closeStream(blockFactory);
+ closeAll(LOG, block, blockFactory);
LOG.debug("Statistics: {}", statistics);
- IOUtils.closeStream(statistics);
+ closeAll(LOG, statistics);
clearActiveBlock();
}
// All end of write operations, including deleting fake parent directories
@@ -378,10 +376,10 @@ class S3ABlockOutputStream extends OutputStream {
final S3ADataBlocks.DataBlock block = getActiveBlock();
int size = block.dataSize();
- final PutObjectRequest putObjectRequest =
- writeOperationHelper.newPutRequest(
- block.startUpload(),
- size);
+ final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
+ final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
+ writeOperationHelper.newPutRequest(uploadData.getFile())
+ : writeOperationHelper.newPutRequest(uploadData.getUploadStream(), size);
fs.setOptionalPutRequestParameters(putObjectRequest);
long transferQueueTime = now();
BlockUploadProgress callback =
@@ -393,8 +391,14 @@ class S3ABlockOutputStream extends OutputStream {
executorService.submit(new Callable<PutObjectResult>() {
@Override
public PutObjectResult call() throws Exception {
- PutObjectResult result = fs.putObjectDirect(putObjectRequest);
- block.close();
+ PutObjectResult result;
+ try {
+ // the putObject call automatically closes the input
+ // stream afterwards.
+ result = writeOperationHelper.putObject(putObjectRequest);
+ } finally {
+ closeAll(LOG, uploadData, block);
+ }
return result;
}
});
@@ -438,13 +442,21 @@ class S3ABlockOutputStream extends OutputStream {
}
/**
+ * Get the statistics for this stream.
+ * @return stream statistics
+ */
+ S3AInstrumentation.OutputStreamStatistics getStatistics() {
+ return statistics;
+ }
+
+ /**
* Multiple partition upload.
*/
private class MultiPartUpload {
private final String uploadId;
private final List<ListenableFuture<PartETag>> partETagsFutures;
- public MultiPartUpload() throws IOException {
+ MultiPartUpload() throws IOException {
this.uploadId = writeOperationHelper.initiateMultiPartUpload();
this.partETagsFutures = new ArrayList<>(2);
LOG.debug("Initiated multi-part upload for {} with " +
@@ -461,14 +473,16 @@ class S3ABlockOutputStream extends OutputStream {
throws IOException {
LOG.debug("Queueing upload of {}", block);
final int size = block.dataSize();
- final InputStream uploadStream = block.startUpload();
+ final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
final int currentPartNumber = partETagsFutures.size() + 1;
final UploadPartRequest request =
writeOperationHelper.newUploadPartRequest(
uploadId,
- uploadStream,
currentPartNumber,
- size);
+ size,
+ uploadData.getUploadStream(),
+ uploadData.getFile());
+
long transferQueueTime = now();
BlockUploadProgress callback =
new BlockUploadProgress(
@@ -483,12 +497,16 @@ class S3ABlockOutputStream extends OutputStream {
LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
uploadId);
// do the upload
- PartETag partETag = fs.uploadPart(request).getPartETag();
- LOG.debug("Completed upload of {}", block);
- LOG.debug("Stream statistics of {}", statistics);
-
- // close the block
- block.close();
+ PartETag partETag;
+ try {
+ partETag = fs.uploadPart(request).getPartETag();
+ LOG.debug("Completed upload of {} to part {}", block,
+ partETag.getETag());
+ LOG.debug("Stream statistics of {}", statistics);
+ } finally {
+ // close the stream and block
+ closeAll(LOG, uploadData, block);
+ }
return partETag;
}
});
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
index 05f8efe..9bc8dcd 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
@@ -24,10 +24,8 @@ import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
-import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -42,10 +40,11 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.util.DirectBufferPool;
import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.closeAll;
/**
* Set of classes to support output streaming into blocks which are then
- * uploaded as partitions.
+ * uploaded as to S3 as a single PUT, or as part of a multipart request.
*/
final class S3ADataBlocks {
@@ -97,6 +96,70 @@ final class S3ADataBlocks {
}
/**
+ * The output information for an upload.
+ * It can be one of a file or an input stream.
+ * When closed, any stream is closed. Any source file is untouched.
+ */
+ static final class BlockUploadData implements Closeable {
+ private final File file;
+ private final InputStream uploadStream;
+
+ /**
+ * File constructor; input stream will be null.
+ * @param file file to upload
+ */
+ BlockUploadData(File file) {
+ Preconditions.checkArgument(file.exists(), "No file: " + file);
+ this.file = file;
+ this.uploadStream = null;
+ }
+
+ /**
+ * Stream constructor, file field will be null.
+ * @param uploadStream stream to upload
+ */
+ BlockUploadData(InputStream uploadStream) {
+ Preconditions.checkNotNull(uploadStream, "rawUploadStream");
+ this.uploadStream = uploadStream;
+ this.file = null;
+ }
+
+ /**
+ * Predicate: does this instance contain a file reference.
+ * @return true if there is a file.
+ */
+ boolean hasFile() {
+ return file != null;
+ }
+
+ /**
+ * Get the file, if there is one.
+ * @return the file for uploading, or null.
+ */
+ File getFile() {
+ return file;
+ }
+
+ /**
+ * Get the raw upload stream, if the object was
+ * created with one.
+ * @return the upload stream or null.
+ */
+ InputStream getUploadStream() {
+ return uploadStream;
+ }
+
+ /**
+ * Close: closes any upload stream provided in the constructor.
+ * @throws IOException inherited exception
+ */
+ @Override
+ public void close() throws IOException {
+ closeAll(LOG, uploadStream);
+ }
+ }
+
+ /**
* Base class for block factories.
*/
static abstract class BlockFactory implements Closeable {
@@ -110,15 +173,21 @@ final class S3ADataBlocks {
/**
* Create a block.
+ *
+ * @param index index of block
* @param limit limit of the block.
+ * @param statistics stats to work with
* @return a new block.
*/
- abstract DataBlock create(int limit) throws IOException;
+ abstract DataBlock create(long index, int limit,
+ S3AInstrumentation.OutputStreamStatistics statistics)
+ throws IOException;
/**
* Implement any close/cleanup operation.
* Base class is a no-op
- * @throws IOException -ideally, it shouldn't.
+ * @throws IOException Inherited exception; implementations should
+ * avoid raising it.
*/
@Override
public void close() throws IOException {
@@ -140,6 +209,14 @@ final class S3ADataBlocks {
enum DestState {Writing, Upload, Closed}
private volatile DestState state = Writing;
+ protected final long index;
+ protected final S3AInstrumentation.OutputStreamStatistics statistics;
+
+ protected DataBlock(long index,
+ S3AInstrumentation.OutputStreamStatistics statistics) {
+ this.index = index;
+ this.statistics = statistics;
+ }
/**
* Atomically enter a state, verifying current state.
@@ -243,8 +320,8 @@ final class S3ADataBlocks {
* @return the stream
* @throws IOException trouble
*/
- InputStream startUpload() throws IOException {
- LOG.debug("Start datablock upload");
+ BlockUploadData startUpload() throws IOException {
+ LOG.debug("Start datablock[{}] upload", index);
enterState(Writing, Upload);
return null;
}
@@ -278,6 +355,23 @@ final class S3ADataBlocks {
}
+ /**
+ * A block has been allocated.
+ */
+ protected void blockAllocated() {
+ if (statistics != null) {
+ statistics.blockAllocated();
+ }
+ }
+
+ /**
+ * A block has been released.
+ */
+ protected void blockReleased() {
+ if (statistics != null) {
+ statistics.blockReleased();
+ }
+ }
}
// ====================================================================
@@ -292,8 +386,10 @@ final class S3ADataBlocks {
}
@Override
- DataBlock create(int limit) throws IOException {
- return new ByteArrayBlock(limit);
+ DataBlock create(long index, int limit,
+ S3AInstrumentation.OutputStreamStatistics statistics)
+ throws IOException {
+ return new ByteArrayBlock(0, limit, statistics);
}
}
@@ -334,9 +430,13 @@ final class S3ADataBlocks {
// cache data size so that it is consistent after the buffer is reset.
private Integer dataSize;
- ByteArrayBlock(int limit) {
+ ByteArrayBlock(long index,
+ int limit,
+ S3AInstrumentation.OutputStreamStatistics statistics) {
+ super(index, statistics);
this.limit = limit;
buffer = new S3AByteArrayOutputStream(limit);
+ blockAllocated();
}
/**
@@ -349,12 +449,12 @@ final class S3ADataBlocks {
}
@Override
- InputStream startUpload() throws IOException {
+ BlockUploadData startUpload() throws IOException {
super.startUpload();
dataSize = buffer.size();
ByteArrayInputStream bufferData = buffer.getInputStream();
buffer = null;
- return bufferData;
+ return new BlockUploadData(bufferData);
}
@Override
@@ -378,12 +478,14 @@ final class S3ADataBlocks {
@Override
protected void innerClose() {
buffer = null;
+ blockReleased();
}
@Override
public String toString() {
- return "ByteArrayBlock{" +
- "state=" + getState() +
+ return "ByteArrayBlock{"
+ +"index=" + index +
+ ", state=" + getState() +
", limit=" + limit +
", dataSize=" + dataSize +
'}';
@@ -395,12 +497,6 @@ final class S3ADataBlocks {
/**
* Stream via Direct ByteBuffers; these are allocated off heap
* via {@link DirectBufferPool}.
- * This is actually the most complex of all the block factories,
- * due to the need to explicitly recycle buffers; in comparison, the
- * {@link DiskBlock} buffer delegates the work of deleting files to
- * the {@link DiskBlock.FileDeletingInputStream}. Here the
- * input stream {@link ByteBufferInputStream} has a similar task, along
- * with the foundational work of streaming data from a byte array.
*/
static class ByteBufferBlockFactory extends BlockFactory {
@@ -413,8 +509,10 @@ final class S3ADataBlocks {
}
@Override
- ByteBufferBlock create(int limit) throws IOException {
- return new ByteBufferBlock(limit);
+ ByteBufferBlock create(long index, int limit,
+ S3AInstrumentation.OutputStreamStatistics statistics)
+ throws IOException {
+ return new ByteBufferBlock(index, limit, statistics);
}
private ByteBuffer requestBuffer(int limit) {
@@ -446,21 +544,27 @@ final class S3ADataBlocks {
/**
* A DataBlock which requests a buffer from pool on creation; returns
- * it when the output stream is closed.
+ * it when it is closed.
*/
class ByteBufferBlock extends DataBlock {
- private ByteBuffer buffer;
+ private ByteBuffer blockBuffer;
private final int bufferSize;
// cache data size so that it is consistent after the buffer is reset.
private Integer dataSize;
/**
* Instantiate. This will request a ByteBuffer of the desired size.
+ * @param index block index
* @param bufferSize buffer size
+ * @param statistics statistics to update
*/
- ByteBufferBlock(int bufferSize) {
+ ByteBufferBlock(long index,
+ int bufferSize,
+ S3AInstrumentation.OutputStreamStatistics statistics) {
+ super(index, statistics);
this.bufferSize = bufferSize;
- buffer = requestBuffer(bufferSize);
+ blockBuffer = requestBuffer(bufferSize);
+ blockAllocated();
}
/**
@@ -473,13 +577,14 @@ final class S3ADataBlocks {
}
@Override
- ByteBufferInputStream startUpload() throws IOException {
+ BlockUploadData startUpload() throws IOException {
super.startUpload();
dataSize = bufferCapacityUsed();
// set the buffer up from reading from the beginning
- buffer.limit(buffer.position());
- buffer.position(0);
- return new ByteBufferInputStream(dataSize, buffer);
+ blockBuffer.limit(blockBuffer.position());
+ blockBuffer.position(0);
+ return new BlockUploadData(
+ new ByteBufferInputStream(dataSize, blockBuffer));
}
@Override
@@ -489,182 +594,190 @@ final class S3ADataBlocks {
@Override
public int remainingCapacity() {
- return buffer != null ? buffer.remaining() : 0;
+ return blockBuffer != null ? blockBuffer.remaining() : 0;
}
private int bufferCapacityUsed() {
- return buffer.capacity() - buffer.remaining();
+ return blockBuffer.capacity() - blockBuffer.remaining();
}
@Override
int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len);
int written = Math.min(remainingCapacity(), len);
- buffer.put(b, offset, written);
+ blockBuffer.put(b, offset, written);
return written;
}
+ /**
+ * Closing the block will release the buffer.
+ */
@Override
protected void innerClose() {
- buffer = null;
+ if (blockBuffer != null) {
+ blockReleased();
+ releaseBuffer(blockBuffer);
+ blockBuffer = null;
+ }
}
@Override
public String toString() {
return "ByteBufferBlock{"
- + "state=" + getState() +
+ + "index=" + index +
+ ", state=" + getState() +
", dataSize=" + dataSize() +
", limit=" + bufferSize +
", remainingCapacity=" + remainingCapacity() +
'}';
}
- }
-
- /**
- * Provide an input stream from a byte buffer; supporting
- * {@link #mark(int)}, which is required to enable replay of failed
- * PUT attempts.
- * This input stream returns the buffer to the pool afterwards.
- */
- class ByteBufferInputStream extends InputStream {
+ /**
+ * Provide an input stream from a byte buffer; supporting
+ * {@link #mark(int)}, which is required to enable replay of failed
+ * PUT attempts.
+ */
+ class ByteBufferInputStream extends InputStream {
- private final int size;
- private ByteBuffer byteBuffer;
+ private final int size;
+ private ByteBuffer byteBuffer;
- ByteBufferInputStream(int size, ByteBuffer byteBuffer) {
- LOG.debug("Creating ByteBufferInputStream of size {}", size);
- this.size = size;
- this.byteBuffer = byteBuffer;
- }
+ ByteBufferInputStream(int size,
+ ByteBuffer byteBuffer) {
+ LOG.debug("Creating ByteBufferInputStream of size {}", size);
+ this.size = size;
+ this.byteBuffer = byteBuffer;
+ }
- /**
- * Return the buffer to the pool after the stream is closed.
- */
- @Override
- public synchronized void close() {
- if (byteBuffer != null) {
- LOG.debug("releasing buffer");
- releaseBuffer(byteBuffer);
+ /**
+ * After the stream is closed, set the local reference to the byte
+ * buffer to null; this guarantees that future attempts to use
+ * stream methods will fail.
+ */
+ @Override
+ public synchronized void close() {
+ LOG.debug("ByteBufferInputStream.close() for {}",
+ ByteBufferBlock.super.toString());
byteBuffer = null;
}
- }
- /**
- * Verify that the stream is open.
- * @throws IOException if the stream is closed
- */
- private void verifyOpen() throws IOException {
- if (byteBuffer == null) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ /**
+ * Verify that the stream is open.
+ * @throws IOException if the stream is closed
+ */
+ private void verifyOpen() throws IOException {
+ if (byteBuffer == null) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
}
- }
- public synchronized int read() throws IOException {
- if (available() > 0) {
- return byteBuffer.get() & 0xFF;
- } else {
- return -1;
+ public synchronized int read() throws IOException {
+ if (available() > 0) {
+ return byteBuffer.get() & 0xFF;
+ } else {
+ return -1;
+ }
}
- }
- @Override
- public synchronized long skip(long offset) throws IOException {
- verifyOpen();
- long newPos = position() + offset;
- if (newPos < 0) {
- throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+ @Override
+ public synchronized long skip(long offset) throws IOException {
+ verifyOpen();
+ long newPos = position() + offset;
+ if (newPos < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+ }
+ if (newPos > size) {
+ throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ }
+ byteBuffer.position((int) newPos);
+ return newPos;
}
- if (newPos > size) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+
+ @Override
+ public synchronized int available() {
+ Preconditions.checkState(byteBuffer != null,
+ FSExceptionMessages.STREAM_IS_CLOSED);
+ return byteBuffer.remaining();
}
- byteBuffer.position((int) newPos);
- return newPos;
- }
- @Override
- public synchronized int available() {
- Preconditions.checkState(byteBuffer != null,
- FSExceptionMessages.STREAM_IS_CLOSED);
- return byteBuffer.remaining();
- }
+ /**
+ * Get the current buffer position.
+ * @return the buffer position
+ */
+ public synchronized int position() {
+ return byteBuffer.position();
+ }
- /**
- * Get the current buffer position.
- * @return the buffer position
- */
- public synchronized int position() {
- return byteBuffer.position();
- }
+ /**
+ * Check if there is data left.
+ * @return true if there is data remaining in the buffer.
+ */
+ public synchronized boolean hasRemaining() {
+ return byteBuffer.hasRemaining();
+ }
- /**
- * Check if there is data left.
- * @return true if there is data remaining in the buffer.
- */
- public synchronized boolean hasRemaining() {
- return byteBuffer.hasRemaining();
- }
+ @Override
+ public synchronized void mark(int readlimit) {
+ LOG.debug("mark at {}", position());
+ byteBuffer.mark();
+ }
- @Override
- public synchronized void mark(int readlimit) {
- LOG.debug("mark at {}", position());
- byteBuffer.mark();
- }
+ @Override
+ public synchronized void reset() throws IOException {
+ LOG.debug("reset");
+ byteBuffer.reset();
+ }
- @Override
- public synchronized void reset() throws IOException {
- LOG.debug("reset");
- byteBuffer.reset();
- }
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
- @Override
- public boolean markSupported() {
- return true;
- }
+ /**
+ * Read in data.
+ * @param b destination buffer
+ * @param offset offset within the buffer
+ * @param length length of bytes to read
+ * @throws EOFException if the position is negative
+ * @throws IndexOutOfBoundsException if there isn't space for the
+ * amount of data requested.
+ * @throws IllegalArgumentException other arguments are invalid.
+ */
+ @SuppressWarnings("NullableProblems")
+ public synchronized int read(byte[] b, int offset, int length)
+ throws IOException {
+ Preconditions.checkArgument(length >= 0, "length is negative");
+ Preconditions.checkArgument(b != null, "Null buffer");
+ if (b.length - offset < length) {
+ throw new IndexOutOfBoundsException(
+ FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+ + ": request length =" + length
+ + ", with offset =" + offset
+ + "; buffer capacity =" + (b.length - offset));
+ }
+ verifyOpen();
+ if (!hasRemaining()) {
+ return -1;
+ }
- /**
- * Read in data.
- * @param buffer destination buffer
- * @param offset offset within the buffer
- * @param length length of bytes to read
- * @throws EOFException if the position is negative
- * @throws IndexOutOfBoundsException if there isn't space for the
- * amount of data requested.
- * @throws IllegalArgumentException other arguments are invalid.
- */
- @SuppressWarnings("NullableProblems")
- public synchronized int read(byte[] buffer, int offset, int length)
- throws IOException {
- Preconditions.checkArgument(length >= 0, "length is negative");
- Preconditions.checkArgument(buffer != null, "Null buffer");
- if (buffer.length - offset < length) {
- throw new IndexOutOfBoundsException(
- FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
- + ": request length =" + length
- + ", with offset =" + offset
- + "; buffer capacity =" + (buffer.length - offset));
+ int toRead = Math.min(length, available());
+ byteBuffer.get(b, offset, toRead);
+ return toRead;
}
- verifyOpen();
- if (!hasRemaining()) {
- return -1;
- }
-
- int toRead = Math.min(length, available());
- byteBuffer.get(buffer, offset, toRead);
- return toRead;
- }
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder(
- "ByteBufferInputStream{");
- sb.append("size=").append(size);
- ByteBuffer buffer = this.byteBuffer;
- if (buffer != null) {
- sb.append(", available=").append(buffer.remaining());
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "ByteBufferInputStream{");
+ sb.append("size=").append(size);
+ ByteBuffer buf = this.byteBuffer;
+ if (buf != null) {
+ sb.append(", available=").append(buf.remaining());
+ }
+ sb.append(", ").append(ByteBufferBlock.super.toString());
+ sb.append('}');
+ return sb.toString();
}
- sb.append('}');
- return sb.toString();
}
}
}
@@ -681,22 +794,29 @@ final class S3ADataBlocks {
}
/**
- * Create a temp file and a block which writes to it.
+ * Create a temp file and a {@link DiskBlock} instance to manage it.
+ *
+ * @param index block index
* @param limit limit of the block.
+ * @param statistics statistics to update
* @return the new block
* @throws IOException IO problems
*/
@Override
- DataBlock create(int limit) throws IOException {
+ DataBlock create(long index,
+ int limit,
+ S3AInstrumentation.OutputStreamStatistics statistics)
+ throws IOException {
File destFile = getOwner()
- .createTmpFileForWrite("s3ablock", limit, getOwner().getConf());
- return new DiskBlock(destFile, limit);
+ .createTmpFileForWrite(String.format("s3ablock-%04d-", index),
+ limit, getOwner().getConf());
+ return new DiskBlock(destFile, limit, index, statistics);
}
}
/**
* Stream to a file.
- * This will stop at the limit; the caller is expected to create a new block
+ * This will stop at the limit; the caller is expected to create a new block.
*/
static class DiskBlock extends DataBlock {
@@ -704,12 +824,17 @@ final class S3ADataBlocks {
private final File bufferFile;
private final int limit;
private BufferedOutputStream out;
- private InputStream uploadStream;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
- DiskBlock(File bufferFile, int limit)
+ DiskBlock(File bufferFile,
+ int limit,
+ long index,
+ S3AInstrumentation.OutputStreamStatistics statistics)
throws FileNotFoundException {
+ super(index, statistics);
this.limit = limit;
this.bufferFile = bufferFile;
+ blockAllocated();
out = new BufferedOutputStream(new FileOutputStream(bufferFile));
}
@@ -738,7 +863,7 @@ final class S3ADataBlocks {
}
@Override
- InputStream startUpload() throws IOException {
+ BlockUploadData startUpload() throws IOException {
super.startUpload();
try {
out.flush();
@@ -746,8 +871,7 @@ final class S3ADataBlocks {
out.close();
out = null;
}
- uploadStream = new FileInputStream(bufferFile);
- return new FileDeletingInputStream(uploadStream);
+ return new BlockUploadData(bufferFile);
}
/**
@@ -755,6 +879,7 @@ final class S3ADataBlocks {
* exists.
* @throws IOException IO problems
*/
+ @SuppressWarnings("UnnecessaryDefault")
@Override
protected void innerClose() throws IOException {
final DestState state = getState();
@@ -763,20 +888,19 @@ final class S3ADataBlocks {
case Writing:
if (bufferFile.exists()) {
// file was not uploaded
- LOG.debug("Deleting buffer file as upload did not start");
- boolean deleted = bufferFile.delete();
- if (!deleted && bufferFile.exists()) {
- LOG.warn("Failed to delete buffer file {}", bufferFile);
- }
+ LOG.debug("Block[{}]: Deleting buffer file as upload did not start",
+ index);
+ closeBlock();
}
break;
case Upload:
- LOG.debug("Buffer file {} exists \u2014close upload stream", bufferFile);
+ LOG.debug("Block[{}]: Buffer file {} exists \u2014close upload stream",
+ index, bufferFile);
break;
case Closed:
- // no-op
+ closeBlock();
break;
default:
@@ -798,7 +922,8 @@ final class S3ADataBlocks {
@Override
public String toString() {
String sb = "FileBlock{"
- + "destFile=" + bufferFile +
+ + "index=" + index
+ + ", destFile=" + bufferFile +
", state=" + getState() +
", dataSize=" + dataSize() +
", limit=" + limit +
@@ -807,31 +932,20 @@ final class S3ADataBlocks {
}
/**
- * An input stream which deletes the buffer file when closed.
+ * Close the block.
+ * This will delete the block's buffer file if the block has
+ * not previously been closed.
*/
- private final class FileDeletingInputStream extends FilterInputStream {
- private final AtomicBoolean closed = new AtomicBoolean(false);
-
- FileDeletingInputStream(InputStream source) {
- super(source);
- }
-
- /**
- * Delete the input file when closed.
- * @throws IOException IO problem
- */
- @Override
- public void close() throws IOException {
- try {
- super.close();
- } finally {
- if (!closed.getAndSet(true)) {
- if (!bufferFile.delete()) {
- LOG.warn("delete({}) returned false",
- bufferFile.getAbsoluteFile());
- }
- }
+ void closeBlock() {
+ LOG.debug("block[{}]: closeBlock()", index);
+ if (!closed.getAndSet(true)) {
+ blockReleased();
+ if (!bufferFile.delete() && bufferFile.exists()) {
+ LOG.warn("delete({}) returned false",
+ bufferFile.getAbsoluteFile());
}
+ } else {
+ LOG.debug("block[{}]: skipping re-entrant closeBlock()", index);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index bc47918..1786e68 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -1022,6 +1022,7 @@ public class S3AFileSystem extends FileSystem {
*/
public PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, File srcfile) {
+ Preconditions.checkNotNull(srcfile);
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
srcfile);
setOptionalPutRequestParameters(putObjectRequest);
@@ -1039,8 +1040,9 @@ public class S3AFileSystem extends FileSystem {
* @param inputStream source data.
* @return the request
*/
- PutObjectRequest newPutObjectRequest(String key,
+ private PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, InputStream inputStream) {
+ Preconditions.checkNotNull(inputStream);
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
inputStream, metadata);
setOptionalPutRequestParameters(putObjectRequest);
@@ -1077,12 +1079,16 @@ public class S3AFileSystem extends FileSystem {
}
/**
- * PUT an object, incrementing the put requests and put bytes
+ * Start a transfer-manager managed async PUT of an object,
+ * incrementing the put requests and put bytes
* counters.
* It does not update the other counters,
* as existing code does that as progress callbacks come in.
* Byte length is calculated from the file length, or, if there is no
* file, from the content length of the header.
+ * Because the operation is async, any stream supplied in the request
+ * must reference data (files, buffers) which stay valid until the upload
+ * completes.
* @param putObjectRequest the request
* @return the upload initiated
*/
@@ -1108,6 +1114,7 @@ public class S3AFileSystem extends FileSystem {
* PUT an object directly (i.e. not via the transfer manager).
* Byte length is calculated from the file length, or, if there is no
* file, from the content length of the header.
+ * <i>Important: this call will close any input stream in the request.</i>
* @param putObjectRequest the request
* @return the upload initiated
* @throws AmazonClientException on problems
@@ -1133,7 +1140,8 @@ public class S3AFileSystem extends FileSystem {
/**
* Upload part of a multi-partition file.
- * Increments the write and put counters
+ * Increments the write and put counters.
+ * <i>Important: this call does not close any input stream in the request.</i>
* @param request request
* @return the result of the operation.
* @throws AmazonClientException on problems
@@ -2309,14 +2317,28 @@ public class S3AFileSystem extends FileSystem {
/**
* Create a {@link PutObjectRequest} request.
- * The metadata is assumed to have been configured with the size of the
- * operation.
+ * If {@code length} is set, the metadata is configured with the size of
+ * the upload.
* @param inputStream source data.
* @param length size, if known. Use -1 for not known
* @return the request
*/
PutObjectRequest newPutRequest(InputStream inputStream, long length) {
- return newPutObjectRequest(key, newObjectMetadata(length), inputStream);
+ PutObjectRequest request = newPutObjectRequest(key,
+ newObjectMetadata(length), inputStream);
+ return request;
+ }
+
+ /**
+ * Create a {@link PutObjectRequest} request to upload a file.
+ * @param sourceFile source file
+ * @return the request
+ */
+ PutObjectRequest newPutRequest(File sourceFile) {
+ int length = (int) sourceFile.length();
+ PutObjectRequest request = newPutObjectRequest(key,
+ newObjectMetadata(length), sourceFile);
+ return request;
}
/**
@@ -2379,6 +2401,8 @@ public class S3AFileSystem extends FileSystem {
Preconditions.checkNotNull(partETags);
Preconditions.checkArgument(!partETags.isEmpty(),
"No partitions have been uploaded");
+ LOG.debug("Completing multipart upload {} with {} parts",
+ uploadId, partETags.size());
return s3.completeMultipartUpload(
new CompleteMultipartUploadRequest(bucket,
key,
@@ -2389,42 +2413,51 @@ public class S3AFileSystem extends FileSystem {
/**
* Abort a multipart upload operation.
* @param uploadId multipart operation Id
- * @return the result
* @throws AmazonClientException on problems.
*/
void abortMultipartUpload(String uploadId) throws AmazonClientException {
+ LOG.debug("Aborting multipart upload {}", uploadId);
s3.abortMultipartUpload(
new AbortMultipartUploadRequest(bucket, key, uploadId));
}
/**
* Create and initialize a part request of a multipart upload.
+ * Exactly one of: {@code uploadStream} or {@code sourceFile}
+ * must be specified.
* @param uploadId ID of ongoing upload
- * @param uploadStream source of data to upload
* @param partNumber current part number of the upload
* @param size amount of data
+ * @param uploadStream source of data to upload
+ * @param sourceFile optional source file.
* @return the request.
*/
UploadPartRequest newUploadPartRequest(String uploadId,
- InputStream uploadStream,
- int partNumber,
- int size) {
+ int partNumber, int size, InputStream uploadStream, File sourceFile) {
Preconditions.checkNotNull(uploadId);
- Preconditions.checkNotNull(uploadStream);
+ // exactly one source must be set; xor verifies this
+ Preconditions.checkArgument((uploadStream != null) ^ (sourceFile != null),
+ "Data source");
Preconditions.checkArgument(size > 0, "Invalid partition size %s", size);
- Preconditions.checkArgument(partNumber> 0 && partNumber <=10000,
+ Preconditions.checkArgument(partNumber > 0 && partNumber <= 10000,
"partNumber must be between 1 and 10000 inclusive, but is %s",
partNumber);
LOG.debug("Creating part upload request for {} #{} size {}",
uploadId, partNumber, size);
- return new UploadPartRequest()
+ UploadPartRequest request = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(uploadId)
- .withInputStream(uploadStream)
.withPartNumber(partNumber)
.withPartSize(size);
+ if (uploadStream != null) {
+ // there's an upload stream. Bind to it.
+ request.setInputStream(uploadStream);
+ } else {
+ request.setFile(sourceFile);
+ }
+ return request;
}
/**
@@ -2439,6 +2472,21 @@ public class S3AFileSystem extends FileSystem {
sb.append('}');
return sb.toString();
}
+
+ /**
+ * PUT an object directly (i.e. not via the transfer manager).
+ * @param putObjectRequest the request
+ * @return the upload initiated
+ * @throws IOException on problems
+ */
+ PutObjectResult putObject(PutObjectRequest putObjectRequest)
+ throws IOException {
+ try {
+ return putObjectDirect(putObjectRequest);
+ } catch (AmazonClientException e) {
+ throw translateException("put", putObjectRequest.getKey(), e);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index fb8c852..d2e7a88 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -36,6 +36,7 @@ import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FileSystem.Statistics;
@@ -428,7 +429,7 @@ public class S3AInstrumentation {
if (gauge != null) {
gauge.decr(count);
} else {
- LOG.debug("No Gauge: " + op);
+ LOG.debug("No Gauge: {}", op);
}
}
@@ -676,6 +677,8 @@ public class S3AInstrumentation {
private final AtomicLong transferDuration = new AtomicLong(0);
private final AtomicLong queueDuration = new AtomicLong(0);
private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0);
+ private final AtomicInteger blocksAllocated = new AtomicInteger(0);
+ private final AtomicInteger blocksReleased = new AtomicInteger(0);
private Statistics statistics;
@@ -684,6 +687,20 @@ public class S3AInstrumentation {
}
/**
+ * A block has been allocated.
+ */
+ void blockAllocated() {
+ blocksAllocated.incrementAndGet();
+ }
+
+ /**
+ * A block has been released.
+ */
+ void blockReleased() {
+ blocksReleased.incrementAndGet();
+ }
+
+ /**
* Block is queued for upload.
*/
void blockUploadQueued(int blockSize) {
@@ -778,6 +795,24 @@ public class S3AInstrumentation {
return queueDuration.get() + transferDuration.get();
}
+ public int blocksAllocated() {
+ return blocksAllocated.get();
+ }
+
+ public int blocksReleased() {
+ return blocksReleased.get();
+ }
+
+ /**
+ * Get counters of blocks actively allocated; my be inaccurate
+ * if the numbers change during the (non-synchronized) calculation.
+ * @return the number of actively allocated blocks.
+ */
+ public int blocksActivelyAllocated() {
+ return blocksAllocated.get() - blocksReleased.get();
+ }
+
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
@@ -789,6 +824,9 @@ public class S3AInstrumentation {
sb.append(", blockUploadsFailed=").append(blockUploadsFailed);
sb.append(", bytesPendingUpload=").append(bytesPendingUpload);
sb.append(", bytesUploaded=").append(bytesUploaded);
+ sb.append(", blocksAllocated=").append(blocksAllocated);
+ sb.append(", blocksReleased=").append(blocksReleased);
+ sb.append(", blocksActivelyAllocated=").append(blocksActivelyAllocated());
sb.append(", exceptionsInMultipartFinalize=").append(
exceptionsInMultipartFinalize);
sb.append(", transferDuration=").append(transferDuration).append(" ms");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 5311211..84f3c99 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -733,4 +733,30 @@ public final class S3AUtils {
}
return null;
}
+
+ /**
+ * Close the Closeable objects and <b>ignore</b> any Exception or
+ * null pointers.
+ * (This is the SLF4J equivalent of that in {@code IOUtils}).
+ * @param log the log to log at debug level. Can be null.
+ * @param closeables the objects to close
+ */
+ public static void closeAll(Logger log,
+ java.io.Closeable... closeables) {
+ for (java.io.Closeable c : closeables) {
+ if (c != null) {
+ try {
+ if (log != null) {
+ log.debug("Closing {}", c);
+ }
+ c.close();
+ } catch (Exception e) {
+ if (log != null && log.isDebugEnabled()) {
+ log.debug("Exception in closing {}", c, e);
+ }
+ }
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
index 74cad00..87f676c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
@@ -24,9 +24,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
import static org.apache.hadoop.fs.s3a.Constants.*;
@@ -38,6 +41,14 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
* multipart tests are kept in scale tests.
*/
public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
+ private static final int BLOCK_SIZE = 256 * 1024;
+
+ private static byte[] dataset;
+
+ @BeforeClass
+ public static void setupDataset() {
+ dataset = ContractTestUtils.dataset(BLOCK_SIZE, 0, 256);
+ }
@Override
protected Configuration createConfiguration() {
@@ -65,9 +76,9 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
}
@Test(expected = IOException.class)
- public void testDoubleStreamClose() throws Throwable {
- Path dest = path("testDoubleStreamClose");
- describe(" testDoubleStreamClose");
+ public void testWriteAfterStreamClose() throws Throwable {
+ Path dest = path("testWriteAfterStreamClose");
+ describe(" testWriteAfterStreamClose");
FSDataOutputStream stream = getFileSystem().create(dest, true);
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
try {
@@ -79,7 +90,25 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
}
}
- public void verifyUpload(String name, int fileSize) throws IOException {
+ @Test
+ public void testBlocksClosed() throws Throwable {
+ Path dest = path("testBlocksClosed");
+ describe(" testBlocksClosed");
+ FSDataOutputStream stream = getFileSystem().create(dest, true);
+ S3AInstrumentation.OutputStreamStatistics statistics
+ = S3ATestUtils.getOutputStreamStatistics(stream);
+ byte[] data = ContractTestUtils.dataset(16, 'a', 26);
+ stream.write(data);
+ LOG.info("closing output stream");
+ stream.close();
+ assertEquals("total allocated blocks in " + statistics,
+ 1, statistics.blocksAllocated());
+ assertEquals("actively allocated blocks in " + statistics,
+ 0, statistics.blocksActivelyAllocated());
+ LOG.info("end of test case");
+ }
+
+ private void verifyUpload(String name, int fileSize) throws IOException {
Path dest = path(name);
describe(name + " upload to " + dest);
ContractTestUtils.createAndVerifyFile(
@@ -87,4 +116,43 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
dest,
fileSize);
}
+
+ /**
+ * Create a factory for used in mark/reset tests.
+ * @param fileSystem source FS
+ * @return the factory
+ */
+ protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) {
+ return new S3ADataBlocks.ArrayBlockFactory(fileSystem);
+ }
+
+ private void markAndResetDatablock(S3ADataBlocks.BlockFactory factory)
+ throws Exception {
+ S3AInstrumentation instrumentation =
+ new S3AInstrumentation(new URI("s3a://example"));
+ S3AInstrumentation.OutputStreamStatistics outstats
+ = instrumentation.newOutputStreamStatistics(null);
+ S3ADataBlocks.DataBlock block = factory.create(1, BLOCK_SIZE, outstats);
+ block.write(dataset, 0, dataset.length);
+ S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
+ InputStream stream = uploadData.getUploadStream();
+ assertNotNull(stream);
+ assertTrue("Mark not supported in " + stream, stream.markSupported());
+ assertEquals(0, stream.read());
+ stream.mark(BLOCK_SIZE);
+ // read a lot
+ long l = 0;
+ while (stream.read() != -1) {
+ // do nothing
+ l++;
+ }
+ stream.reset();
+ assertEquals(1, stream.read());
+ }
+
+ @Test
+ public void testMarkReset() throws Throwable {
+ markAndResetDatablock(createFactory(getFileSystem()));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java
index 504426b..02f3de0 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.fs.s3a;
-
/**
* Use {@link Constants#FAST_UPLOAD_BYTEBUFFER} for buffering.
*/
@@ -27,4 +26,8 @@ public class ITestS3ABlockOutputByteBuffer extends ITestS3ABlockOutputArray {
return Constants.FAST_UPLOAD_BYTEBUFFER;
}
+ protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) {
+ return new S3ADataBlocks.ByteBufferBlockFactory(fileSystem);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java
index 550706d..abe8656 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.fs.s3a;
+import org.junit.Assume;
+
/**
* Use {@link Constants#FAST_UPLOAD_BUFFER_DISK} for buffering.
*/
@@ -27,4 +29,14 @@ public class ITestS3ABlockOutputDisk extends ITestS3ABlockOutputArray {
return Constants.FAST_UPLOAD_BUFFER_DISK;
}
+ /**
+ * The disk stream doesn't support mark/reset; calls
+ * {@code Assume} to skip the test.
+ * @param fileSystem source FS
+ * @return null
+ */
+ protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) {
+ Assume.assumeTrue("mark/reset nopt supoprted", false);
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 567bacb..9528967 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
@@ -544,4 +545,16 @@ public final class S3ATestUtils {
}
Assume.assumeTrue(message, condition);
}
+
+ /**
+ * Get the statistics from a wrapped block output stream.
+ * @param out output stream
+ * @return the (active) stats of the write
+ */
+ public static S3AInstrumentation.OutputStreamStatistics
+ getOutputStreamStatistics(FSDataOutputStream out) {
+ S3ABlockOutputStream blockOutputStream
+ = (S3ABlockOutputStream) out.getWrappedStream();
+ return blockOutputStream.getStatistics();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java
index 9fa95fd..700ef5c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java
@@ -51,9 +51,8 @@ public class TestDataBlocks extends Assert {
new S3ADataBlocks.ByteBufferBlockFactory(null)) {
int limit = 128;
S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock block
- = factory.create(limit);
- assertEquals("outstanding buffers in " + factory,
- 1, factory.getOutstandingBufferCount());
+ = factory.create(1, limit, null);
+ assertOutstandingBuffers(factory, 1);
byte[] buffer = ContractTestUtils.toAsciiByteArray("test data");
int bufferLen = buffer.length;
@@ -66,24 +65,23 @@ public class TestDataBlocks extends Assert {
block.hasCapacity(limit - bufferLen));
// now start the write
- S3ADataBlocks.ByteBufferBlockFactory.ByteBufferInputStream
- stream = block.startUpload();
+ S3ADataBlocks.BlockUploadData blockUploadData = block.startUpload();
+ S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream
+ stream =
+ (S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream)
+ blockUploadData.getUploadStream();
+ assertTrue("Mark not supported in " + stream, stream.markSupported());
assertTrue("!hasRemaining() in " + stream, stream.hasRemaining());
int expected = bufferLen;
assertEquals("wrong available() in " + stream,
expected, stream.available());
assertEquals('t', stream.read());
+ stream.mark(limit);
expected--;
assertEquals("wrong available() in " + stream,
expected, stream.available());
- // close the block. The buffer must remain outstanding here;
- // the stream manages the lifecycle of it now
- block.close();
- assertEquals("outstanding buffers in " + factory,
- 1, factory.getOutstandingBufferCount());
- block.close();
// read into a byte array with an offset
int offset = 5;
@@ -109,16 +107,31 @@ public class TestDataBlocks extends Assert {
0, stream.available());
assertTrue("hasRemaining() in " + stream, !stream.hasRemaining());
+ // go the mark point
+ stream.reset();
+ assertEquals('e', stream.read());
+
// when the stream is closed, the data should be returned
stream.close();
- assertEquals("outstanding buffers in " + factory,
- 0, factory.getOutstandingBufferCount());
+ assertOutstandingBuffers(factory, 1);
+ block.close();
+ assertOutstandingBuffers(factory, 0);
stream.close();
- assertEquals("outstanding buffers in " + factory,
- 0, factory.getOutstandingBufferCount());
-
+ assertOutstandingBuffers(factory, 0);
}
}
+ /**
+ * Assert the number of buffers active for a block factory.
+ * @param factory factory
+ * @param expectedCount expected count.
+ */
+ private static void assertOutstandingBuffers(
+ S3ADataBlocks.ByteBufferBlockFactory factory,
+ int expectedCount) {
+ assertEquals("outstanding buffers in " + factory,
+ expectedCount, factory.getOutstandingBufferCount());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
index fcb6444..89fae82 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -34,11 +34,13 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.util.Progressable;
@@ -159,13 +161,20 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
Statistic putBytesPending = Statistic.OBJECT_PUT_BYTES_PENDING;
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
-
+ S3AInstrumentation.OutputStreamStatistics streamStatistics;
long blocksPer10MB = blocksPerMB * 10;
ProgressCallback progress = new ProgressCallback(timer);
try (FSDataOutputStream out = fs.create(hugefile,
true,
uploadBlockSize,
progress)) {
+ try {
+ streamStatistics = getOutputStreamStatistics(out);
+ } catch (ClassCastException e) {
+ LOG.info("Wrapped output stream is not block stream: {}",
+ out.getWrappedStream());
+ streamStatistics = null;
+ }
for (long block = 1; block <= blocks; block++) {
out.write(data);
@@ -190,7 +199,8 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
}
}
// now close the file
- LOG.info("Closing file and completing write operation");
+ LOG.info("Closing stream {}", out);
+ LOG.info("Statistics : {}", streamStatistics);
ContractTestUtils.NanoTimer closeTimer
= new ContractTestUtils.NanoTimer();
out.close();
@@ -201,6 +211,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
filesizeMB, uploadBlockSize);
logFSState();
bandwidth(timer, filesize);
+ LOG.info("Statistics after stream closed: {}", streamStatistics);
long putRequestCount = storageStatistics.getLong(putRequests);
Long putByteCount = storageStatistics.getLong(putBytes);
LOG.info("PUT {} bytes in {} operations; {} MB/operation",
@@ -214,7 +225,14 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
S3AFileStatus status = fs.getFileStatus(hugefile);
ContractTestUtils.assertIsFile(hugefile, status);
assertEquals("File size in " + status, filesize, status.getLen());
- progress.verifyNoFailures("Put file " + hugefile + " of size " + filesize);
+ if (progress != null) {
+ progress.verifyNoFailures("Put file " + hugefile
+ + " of size " + filesize);
+ }
+ if (streamStatistics != null) {
+ assertEquals("actively allocated blocks in " + streamStatistics,
+ 0, streamStatistics.blocksActivelyAllocated());
+ }
}
/**
@@ -285,7 +303,9 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
void assumeHugeFileExists() throws IOException {
S3AFileSystem fs = getFileSystem();
ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
- ContractTestUtils.assertIsFile(fs, hugefile);
+ FileStatus status = fs.getFileStatus(hugefile);
+ ContractTestUtils.assertIsFile(hugefile, status);
+ assertTrue("File " + hugefile + " is empty", status.getLen() > 0);
}
private void logFSState() {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org