You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2018/05/21 10:57:58 UTC
hadoop git commit: HADOOP-15478. WASB: hflush() and hsync()
regression. Contributed by Thomas Marquardt.
Repository: hadoop
Updated Branches:
refs/heads/branch-2 8f45d119a -> e2d32bd9c
HADOOP-15478. WASB: hflush() and hsync() regression.
Contributed by Thomas Marquardt.
(cherry picked from commit ba842847c94d31d3f737226d954c566b5d88656b)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e2d32bd9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e2d32bd9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e2d32bd9
Branch: refs/heads/branch-2
Commit: e2d32bd9c58056064fb1a5d895df53788e99b288
Parents: 8f45d11
Author: Steve Loughran <st...@apache.org>
Authored: Mon May 21 11:02:01 2018 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon May 21 11:13:57 2018 +0100
----------------------------------------------------------------------
.../hadoop/fs/azure/PageBlobOutputStream.java | 13 +-
.../fs/azure/SyncableDataOutputStream.java | 4 -
.../fs/azure/ITestOutputStreamSemantics.java | 385 +++++++++++++++++++
3 files changed, 397 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2d32bd9/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
index b2b34f8..68ddcdf 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
@@ -376,6 +376,18 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
outBuffer = new ByteArrayOutputStream();
}
+ @VisibleForTesting
+ synchronized void waitForLastFlushCompletion() throws IOException {
+ try {
+ if (lastQueuedTask != null) {
+ lastQueuedTask.waitTillDone();
+ }
+ } catch (InterruptedException e1) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ }
+ }
+
/**
* Extend the page blob file if we are close to the end.
*/
@@ -554,7 +566,6 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
}
@Override
-
public void hflush() throws IOException {
// hflush is required to force data to storage, so call hsync,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2d32bd9/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
index 04545ac..72a88ce 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
@@ -65,8 +65,6 @@ public class SyncableDataOutputStream extends DataOutputStream
public void hflush() throws IOException {
if (out instanceof Syncable) {
((Syncable) out).hflush();
- } else {
- out.flush();
}
}
@@ -74,8 +72,6 @@ public class SyncableDataOutputStream extends DataOutputStream
public void hsync() throws IOException {
if (out instanceof Syncable) {
((Syncable) out).hsync();
- } else {
- out.flush();
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2d32bd9/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java
new file mode 100644
index 0000000..9ac1f73
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.Random;
+
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+
+import org.hamcrest.core.IsEqual;
+import org.hamcrest.core.IsNot;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.junit.Assume.assumeNotNull;
+
+/**
+ * Test semantics of functions flush, hflush, hsync, and close for block blobs,
+ * block blobs with compaction, and page blobs.
+ */
+public class ITestOutputStreamSemantics extends AbstractWasbTestBase {
+
+ private static final String PAGE_BLOB_DIR = "/pageblob";
+ private static final String BLOCK_BLOB_DIR = "/blockblob";
+ private static final String BLOCK_BLOB_COMPACTION_DIR = "/compaction";
+
+ private byte[] getRandomBytes() {
+ byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE
+ - PageBlobFormatHelpers.PAGE_HEADER_SIZE];
+ Random rand = new Random();
+ rand.nextBytes(buffer);
+ return buffer;
+ }
+
+ private Path getBlobPathWithTestName(String parentDir) {
+ return new Path(parentDir + "/" + methodName.getMethodName());
+ }
+
+ private void validate(Path path, byte[] writeBuffer, boolean isEqual)
+ throws IOException {
+ String blobPath = path.toUri().getPath();
+ try (FSDataInputStream inputStream = fs.open(path)) {
+ byte[] readBuffer = new byte[PageBlobFormatHelpers.PAGE_SIZE
+ - PageBlobFormatHelpers.PAGE_HEADER_SIZE];
+ int numBytesRead = inputStream.read(readBuffer, 0, readBuffer.length);
+
+ if (isEqual) {
+ assertArrayEquals(
+ String.format("Bytes read do not match bytes written to %1$s",
+ blobPath),
+ writeBuffer,
+ readBuffer);
+ } else {
+ assertThat(
+ String.format("Bytes read unexpectedly match bytes written to %1$s",
+ blobPath),
+ readBuffer,
+ IsNot.not(IsEqual.equalTo(writeBuffer)));
+ }
+ }
+ }
+
+ private boolean isBlockBlobAppendStreamWrapper(FSDataOutputStream stream) {
+ return
+ ((SyncableDataOutputStream)
+ ((NativeAzureFileSystem.NativeAzureFsOutputStream)
+ stream.getWrappedStream())
+ .getOutStream())
+ .getOutStream()
+ instanceof BlockBlobAppendStream;
+ }
+
+ private boolean isPageBlobStreamWrapper(FSDataOutputStream stream) {
+ return
+ ((SyncableDataOutputStream) stream.getWrappedStream())
+ .getOutStream()
+ instanceof PageBlobOutputStream;
+ }
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ Configuration conf = new Configuration();
+
+ // Configure the page blob directories
+ conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, PAGE_BLOB_DIR);
+
+ // Configure the block blob with compaction directories
+ conf.set(AzureNativeFileSystemStore.KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES,
+ BLOCK_BLOB_COMPACTION_DIR);
+
+ return AzureBlobStorageTestAccount.create(
+ "",
+ EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
+ conf);
+ }
+
+ // Verify flush writes data to storage for Page Blobs
+ @Test
+ public void testPageBlobFlush() throws IOException {
+ Path path = getBlobPathWithTestName(PAGE_BLOB_DIR);
+
+ try (FSDataOutputStream stream = fs.create(path)) {
+ byte[] buffer = getRandomBytes();
+ stream.write(buffer);
+ stream.flush();
+
+ // flush is asynchronous for Page Blob, so we need to
+ // wait for it to complete
+ SyncableDataOutputStream syncStream =
+ (SyncableDataOutputStream) stream.getWrappedStream();
+ PageBlobOutputStream pageBlobStream =
+ (PageBlobOutputStream)syncStream.getOutStream();
+ pageBlobStream.waitForLastFlushCompletion();
+
+ validate(path, buffer, true);
+ }
+ }
+
+
+ // Verify hflush writes data to storage for Page Blobs
+ @Test
+ public void testPageBlobHFlush() throws IOException {
+ Path path = getBlobPathWithTestName(PAGE_BLOB_DIR);
+
+ try (FSDataOutputStream stream = fs.create(path)) {
+ assertTrue(isPageBlobStreamWrapper(stream));
+ byte[] buffer = getRandomBytes();
+ stream.write(buffer);
+ stream.hflush();
+ validate(path, buffer, true);
+ }
+ }
+
+ // HSync must write data to storage for Page Blobs
+ @Test
+ public void testPageBlobHSync() throws IOException {
+ Path path = getBlobPathWithTestName(PAGE_BLOB_DIR);
+
+ try (FSDataOutputStream stream = fs.create(path)) {
+ assertTrue(isPageBlobStreamWrapper(stream));
+ byte[] buffer = getRandomBytes();
+ stream.write(buffer);
+ stream.hsync();
+ validate(path, buffer, true);
+ }
+ }
+
+ // Close must write data to storage for Page Blobs
+ @Test
+ public void testPageBlobClose() throws IOException {
+ Path path = getBlobPathWithTestName(PAGE_BLOB_DIR);
+
+ try (FSDataOutputStream stream = fs.create(path)) {
+ assertTrue(isPageBlobStreamWrapper(stream));
+ byte[] buffer = getRandomBytes();
+ stream.write(buffer);
+ stream.close();
+ validate(path, buffer, true);
+ }
+ }
+
+ // Verify flush does not write data to storage for Block Blobs
+ @Test
+ public void testBlockBlobFlush() throws Exception {
+ Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR);
+ byte[] buffer = getRandomBytes();
+
+ try (FSDataOutputStream stream = fs.create(path)) {
+ for (int i = 0; i < 10; i++) {
+ stream.write(buffer);
+ stream.flush();
+ }
+ }
+ String blobPath = path.toUri().getPath();
+ // Create a blob reference to read and validate the block list
+ CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1));
+ // after the stream is closed, the block list should be non-empty
+ ArrayList<BlockEntry> blockList = blob.downloadBlockList(
+ BlockListingFilter.COMMITTED,
+ null,null, null);
+ assertEquals(1, blockList.size());
+ }
+
+ // Verify hflush does not write data to storage for Block Blobs
+ @Test
+ public void testBlockBlobHFlush() throws Exception {
+ Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR);
+ byte[] buffer = getRandomBytes();
+
+ try (FSDataOutputStream stream = fs.create(path)) {
+ for (int i = 0; i < 10; i++) {
+ stream.write(buffer);
+ stream.hflush();
+ }
+ }
+ String blobPath = path.toUri().getPath();
+ // Create a blob reference to read and validate the block list
+ CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1));
+ // after the stream is closed, the block list should be non-empty
+ ArrayList<BlockEntry> blockList = blob.downloadBlockList(
+ BlockListingFilter.COMMITTED,
+ null,null, null);
+ assertEquals(1, blockList.size());
+ }
+
+ // Verify hsync does not write data to storage for Block Blobs
+ @Test
+ public void testBlockBlobHSync() throws Exception {
+ Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR);
+ byte[] buffer = getRandomBytes();
+
+ try (FSDataOutputStream stream = fs.create(path)) {
+ for (int i = 0; i < 10; i++) {
+ stream.write(buffer);
+ stream.hsync();
+ }
+ }
+ String blobPath = path.toUri().getPath();
+ // Create a blob reference to read and validate the block list
+ CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1));
+ // after the stream is closed, the block list should be non-empty
+ ArrayList<BlockEntry> blockList = blob.downloadBlockList(
+ BlockListingFilter.COMMITTED,
+ null,null, null);
+ assertEquals(1, blockList.size());
+ }
+
+ // Close must write data to storage for Block Blobs
+ @Test
+ public void testBlockBlobClose() throws IOException {
+ Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR);
+
+ try (FSDataOutputStream stream = fs.create(path)) {
+ byte[] buffer = getRandomBytes();
+ stream.write(buffer);
+ stream.close();
+ validate(path, buffer, true);
+ }
+ }
+
+ // Verify flush writes data to storage for Block Blobs with compaction
+ @Test
+ public void testBlockBlobCompactionFlush() throws Exception {
+ Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR);
+ byte[] buffer = getRandomBytes();
+
+ try (FSDataOutputStream stream = fs.create(path)) {
+ assertTrue(isBlockBlobAppendStreamWrapper(stream));
+ for (int i = 0; i < 10; i++) {
+ stream.write(buffer);
+ stream.flush();
+ }
+ }
+ String blobPath = path.toUri().getPath();
+ // Create a blob reference to read and validate the block list
+ CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1));
+ // after the stream is closed, the block list should be non-empty
+ ArrayList<BlockEntry> blockList = blob.downloadBlockList(
+ BlockListingFilter.COMMITTED,
+ null,null, null);
+ assertEquals(1, blockList.size());
+ }
+
+ // Verify hflush writes data to storage for Block Blobs with Compaction
+ @Test
+ public void testBlockBlobCompactionHFlush() throws Exception {
+ Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR);
+ byte[] buffer = getRandomBytes();
+
+ try (FSDataOutputStream stream = fs.create(path)) {
+ assertTrue(isBlockBlobAppendStreamWrapper(stream));
+ for (int i = 0; i < 10; i++) {
+ stream.write(buffer);
+ stream.hflush();
+ }
+ }
+ String blobPath = path.toUri().getPath();
+ // Create a blob reference to read and validate the block list
+ CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1));
+ // after the stream is closed, the block list should be non-empty
+ ArrayList<BlockEntry> blockList = blob.downloadBlockList(
+ BlockListingFilter.COMMITTED,
+ null,null, null);
+ assertEquals(10, blockList.size());
+ }
+
+ // Verify hsync writes data to storage for Block Blobs with compaction
+ @Test
+ public void testBlockBlobCompactionHSync() throws Exception {
+ Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR);
+ byte[] buffer = getRandomBytes();
+
+ try (FSDataOutputStream stream = fs.create(path)) {
+ assertTrue(isBlockBlobAppendStreamWrapper(stream));
+ for (int i = 0; i < 10; i++) {
+ stream.write(buffer);
+ stream.hsync();
+ }
+ }
+ String blobPath = path.toUri().getPath();
+ // Create a blob reference to read and validate the block list
+ CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1));
+ // after the stream is closed, the block list should be non-empty
+ ArrayList<BlockEntry> blockList = blob.downloadBlockList(
+ BlockListingFilter.COMMITTED,
+ null,null, null);
+ assertEquals(10, blockList.size());
+ }
+
+ // Close must write data to storage for Block Blobs with compaction
+ @Test
+ public void testBlockBlobCompactionClose() throws IOException {
+ Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR);
+ try (FSDataOutputStream stream = fs.create(path)) {
+ assertTrue(isBlockBlobAppendStreamWrapper(stream));
+ byte[] buffer = getRandomBytes();
+ stream.write(buffer);
+ stream.close();
+ validate(path, buffer, true);
+ }
+ }
+
+ // A small write does not write data to storage for Page Blobs
+ @Test
+ public void testPageBlobSmallWrite() throws IOException {
+ Path path = getBlobPathWithTestName(PAGE_BLOB_DIR);
+ try (FSDataOutputStream stream = fs.create(path)) {
+ assertTrue(isPageBlobStreamWrapper(stream));
+ byte[] buffer = getRandomBytes();
+ stream.write(buffer);
+ validate(path, buffer, false);
+ }
+ }
+
+ // A small write does not write data to storage for Block Blobs
+ @Test
+ public void testBlockBlobSmallWrite() throws IOException {
+ Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR);
+ try (FSDataOutputStream stream = fs.create(path)) {
+ byte[] buffer = getRandomBytes();
+ stream.write(buffer);
+ validate(path, buffer, false);
+ }
+ }
+
+ // A small write does not write data to storage for Block Blobs
+ // with Compaction
+ @Test
+ public void testBlockBlobCompactionSmallWrite() throws IOException {
+ Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR);
+ try (FSDataOutputStream stream = fs.create(path)) {
+ assertTrue(isBlockBlobAppendStreamWrapper(stream));
+ byte[] buffer = getRandomBytes();
+ stream.write(buffer);
+ validate(path, buffer, false);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org