You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2018/05/22 20:14:57 UTC

[29/50] [abbrv] hadoop git commit: HADOOP-15478. WASB: hflush() and hsync() regression. Contributed by Thomas Marquardt.

HADOOP-15478. WASB: hflush() and hsync() regression.
Contributed by Thomas Marquardt.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ba842847
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ba842847
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ba842847

Branch: refs/heads/HDDS-48
Commit: ba842847c94d31d3f737226d954c566b5d88656b
Parents: a23ff8d
Author: Steve Loughran <st...@apache.org>
Authored: Mon May 21 11:02:01 2018 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon May 21 11:12:34 2018 +0100

----------------------------------------------------------------------
 .../hadoop/fs/azure/PageBlobOutputStream.java   |  13 +-
 .../fs/azure/SyncableDataOutputStream.java      |   4 -
 .../fs/azure/ITestOutputStreamSemantics.java    | 385 +++++++++++++++++++
 3 files changed, 397 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba842847/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
index b2b34f8..68ddcdf 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
@@ -376,6 +376,18 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
     outBuffer = new ByteArrayOutputStream();
   }
 
+  @VisibleForTesting
+  synchronized void waitForLastFlushCompletion() throws IOException {
+    try {
+      if (lastQueuedTask != null) {
+        lastQueuedTask.waitTillDone();
+      }
+    } catch (InterruptedException e1) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+    }
+  }
+
   /**
    * Extend the page blob file if we are close to the end.
    */
@@ -554,7 +566,6 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
   }
 
   @Override
-
   public void hflush() throws IOException {
 
     // hflush is required to force data to storage, so call hsync,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba842847/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
index fc8796b..dcfff2f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
@@ -61,8 +61,6 @@ public class SyncableDataOutputStream extends DataOutputStream
   public void hflush() throws IOException {
     if (out instanceof Syncable) {
       ((Syncable) out).hflush();
-    } else {
-      out.flush();
     }
   }
 
@@ -70,8 +68,6 @@ public class SyncableDataOutputStream extends DataOutputStream
   public void hsync() throws IOException {
     if (out instanceof Syncable) {
       ((Syncable) out).hsync();
-    } else {
-      out.flush();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba842847/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java
new file mode 100644
index 0000000..9ac1f73
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.Random;
+
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+
+import org.hamcrest.core.IsEqual;
+import org.hamcrest.core.IsNot;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.junit.Assume.assumeNotNull;
+
+/**
+ * Test semantics of functions flush, hflush, hsync, and close for block blobs,
+ * block blobs with compaction, and page blobs.
+ */
+public class ITestOutputStreamSemantics extends AbstractWasbTestBase {
+
+  private static final String PAGE_BLOB_DIR = "/pageblob";
+  private static final String BLOCK_BLOB_DIR = "/blockblob";
+  private static final String BLOCK_BLOB_COMPACTION_DIR = "/compaction";
+
+  private byte[] getRandomBytes() {
+    byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE
+        - PageBlobFormatHelpers.PAGE_HEADER_SIZE];
+    Random rand = new Random();
+    rand.nextBytes(buffer);
+    return buffer;
+  }
+
+  private Path getBlobPathWithTestName(String parentDir) {
+    return new Path(parentDir + "/" + methodName.getMethodName());
+  }
+
+  private void validate(Path path, byte[] writeBuffer, boolean isEqual)
+      throws IOException {
+    String blobPath = path.toUri().getPath();
+    try (FSDataInputStream inputStream = fs.open(path)) {
+      byte[] readBuffer = new byte[PageBlobFormatHelpers.PAGE_SIZE
+          - PageBlobFormatHelpers.PAGE_HEADER_SIZE];
+      int numBytesRead = inputStream.read(readBuffer, 0, readBuffer.length);
+
+      if (isEqual) {
+        assertArrayEquals(
+            String.format("Bytes read do not match bytes written to %1$s",
+                blobPath),
+            writeBuffer,
+            readBuffer);
+      } else {
+        assertThat(
+            String.format("Bytes read unexpectedly match bytes written to %1$s",
+                blobPath),
+            readBuffer,
+            IsNot.not(IsEqual.equalTo(writeBuffer)));
+      }
+    }
+  }
+
+  private boolean isBlockBlobAppendStreamWrapper(FSDataOutputStream stream) {
+    return
+    ((SyncableDataOutputStream)
+        ((NativeAzureFileSystem.NativeAzureFsOutputStream)
+            stream.getWrappedStream())
+            .getOutStream())
+        .getOutStream()
+        instanceof  BlockBlobAppendStream;
+  }
+
+  private boolean isPageBlobStreamWrapper(FSDataOutputStream stream) {
+    return
+        ((SyncableDataOutputStream) stream.getWrappedStream())
+        .getOutStream()
+            instanceof  PageBlobOutputStream;
+  }
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    Configuration conf = new Configuration();
+
+    // Configure the page blob directories
+    conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, PAGE_BLOB_DIR);
+
+    // Configure the block blob with compaction directories
+    conf.set(AzureNativeFileSystemStore.KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES,
+        BLOCK_BLOB_COMPACTION_DIR);
+
+    return AzureBlobStorageTestAccount.create(
+        "",
+        EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
+        conf);
+  }
+
+  // Verify flush writes data to storage for Page Blobs
+  @Test
+  public void testPageBlobFlush() throws IOException {
+    Path path = getBlobPathWithTestName(PAGE_BLOB_DIR);
+
+    try (FSDataOutputStream stream = fs.create(path)) {
+      byte[] buffer = getRandomBytes();
+      stream.write(buffer);
+      stream.flush();
+
+      // flush is asynchronous for Page Blob, so we need to
+      // wait for it to complete
+      SyncableDataOutputStream syncStream =
+          (SyncableDataOutputStream) stream.getWrappedStream();
+      PageBlobOutputStream pageBlobStream =
+          (PageBlobOutputStream)syncStream.getOutStream();
+      pageBlobStream.waitForLastFlushCompletion();
+
+      validate(path, buffer, true);
+    }
+  }
+
+
+  // Verify hflush writes data to storage for Page Blobs
+  @Test
+  public void testPageBlobHFlush() throws IOException {
+    Path path = getBlobPathWithTestName(PAGE_BLOB_DIR);
+
+    try (FSDataOutputStream stream = fs.create(path)) {
+      assertTrue(isPageBlobStreamWrapper(stream));
+      byte[] buffer = getRandomBytes();
+      stream.write(buffer);
+      stream.hflush();
+      validate(path, buffer, true);
+    }
+  }
+
+  // HSync must write data to storage for Page Blobs
+  @Test
+  public void testPageBlobHSync() throws IOException {
+    Path path = getBlobPathWithTestName(PAGE_BLOB_DIR);
+
+    try (FSDataOutputStream stream = fs.create(path)) {
+      assertTrue(isPageBlobStreamWrapper(stream));
+      byte[] buffer = getRandomBytes();
+      stream.write(buffer);
+      stream.hsync();
+      validate(path, buffer, true);
+    }
+  }
+
+  // Close must write data to storage for Page Blobs
+  @Test
+  public void testPageBlobClose() throws IOException {
+    Path path = getBlobPathWithTestName(PAGE_BLOB_DIR);
+
+    try (FSDataOutputStream stream = fs.create(path)) {
+      assertTrue(isPageBlobStreamWrapper(stream));
+      byte[] buffer = getRandomBytes();
+      stream.write(buffer);
+      stream.close();
+      validate(path, buffer, true);
+    }
+  }
+
+  // Verify flush does not write data to storage for Block Blobs
+  @Test
+  public void testBlockBlobFlush() throws Exception {
+    Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR);
+    byte[] buffer = getRandomBytes();
+
+    try (FSDataOutputStream stream = fs.create(path)) {
+      for (int i = 0; i < 10; i++) {
+        stream.write(buffer);
+        stream.flush();
+      }
+    }
+    String blobPath = path.toUri().getPath();
+    // Create a blob reference to read and validate the block list
+    CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1));
+    // after the stream is closed, the block list should be non-empty
+    ArrayList<BlockEntry> blockList = blob.downloadBlockList(
+        BlockListingFilter.COMMITTED,
+        null,null, null);
+    assertEquals(1, blockList.size());
+  }
+
+  // Verify hflush does not write data to storage for Block Blobs
+  @Test
+  public void testBlockBlobHFlush() throws Exception {
+    Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR);
+    byte[] buffer = getRandomBytes();
+
+    try (FSDataOutputStream stream = fs.create(path)) {
+      for (int i = 0; i < 10; i++) {
+        stream.write(buffer);
+        stream.hflush();
+      }
+    }
+    String blobPath = path.toUri().getPath();
+    // Create a blob reference to read and validate the block list
+    CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1));
+    // after the stream is closed, the block list should be non-empty
+    ArrayList<BlockEntry> blockList = blob.downloadBlockList(
+        BlockListingFilter.COMMITTED,
+        null,null, null);
+    assertEquals(1, blockList.size());
+  }
+
+  // Verify hsync does not write data to storage for Block Blobs
+  @Test
+  public void testBlockBlobHSync() throws Exception {
+    Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR);
+    byte[] buffer = getRandomBytes();
+
+    try (FSDataOutputStream stream = fs.create(path)) {
+      for (int i = 0; i < 10; i++) {
+        stream.write(buffer);
+        stream.hsync();
+      }
+    }
+    String blobPath = path.toUri().getPath();
+    // Create a blob reference to read and validate the block list
+    CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1));
+    // after the stream is closed, the block list should be non-empty
+    ArrayList<BlockEntry> blockList = blob.downloadBlockList(
+        BlockListingFilter.COMMITTED,
+        null,null, null);
+    assertEquals(1, blockList.size());
+  }
+
+  // Close must write data to storage for Block Blobs
+  @Test
+  public void testBlockBlobClose() throws IOException {
+    Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR);
+
+    try (FSDataOutputStream stream = fs.create(path)) {
+      byte[] buffer = getRandomBytes();
+      stream.write(buffer);
+      stream.close();
+      validate(path, buffer, true);
+    }
+  }
+
+  // Verify flush writes data to storage for Block Blobs with compaction
+  @Test
+  public void testBlockBlobCompactionFlush() throws Exception {
+    Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR);
+    byte[] buffer = getRandomBytes();
+
+    try (FSDataOutputStream stream = fs.create(path)) {
+      assertTrue(isBlockBlobAppendStreamWrapper(stream));
+      for (int i = 0; i < 10; i++) {
+        stream.write(buffer);
+        stream.flush();
+      }
+    }
+    String blobPath = path.toUri().getPath();
+    // Create a blob reference to read and validate the block list
+    CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1));
+    // after the stream is closed, the block list should be non-empty
+    ArrayList<BlockEntry> blockList = blob.downloadBlockList(
+        BlockListingFilter.COMMITTED,
+        null,null, null);
+    assertEquals(1, blockList.size());
+  }
+
+  // Verify hflush writes data to storage for Block Blobs with Compaction
+  @Test
+  public void testBlockBlobCompactionHFlush() throws Exception {
+    Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR);
+    byte[] buffer = getRandomBytes();
+
+    try (FSDataOutputStream stream = fs.create(path)) {
+      assertTrue(isBlockBlobAppendStreamWrapper(stream));
+      for (int i = 0; i < 10; i++) {
+        stream.write(buffer);
+        stream.hflush();
+      }
+    }
+    String blobPath = path.toUri().getPath();
+    // Create a blob reference to read and validate the block list
+    CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1));
+    // after the stream is closed, the block list should be non-empty
+    ArrayList<BlockEntry> blockList = blob.downloadBlockList(
+        BlockListingFilter.COMMITTED,
+        null,null, null);
+    assertEquals(10, blockList.size());
+  }
+
+  // Verify hsync writes data to storage for Block Blobs with compaction
+  @Test
+  public void testBlockBlobCompactionHSync() throws Exception {
+    Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR);
+    byte[] buffer = getRandomBytes();
+
+    try (FSDataOutputStream stream = fs.create(path)) {
+      assertTrue(isBlockBlobAppendStreamWrapper(stream));
+      for (int i = 0; i < 10; i++) {
+        stream.write(buffer);
+        stream.hsync();
+      }
+    }
+    String blobPath = path.toUri().getPath();
+    // Create a blob reference to read and validate the block list
+    CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1));
+    // after the stream is closed, the block list should be non-empty
+    ArrayList<BlockEntry> blockList = blob.downloadBlockList(
+        BlockListingFilter.COMMITTED,
+        null,null, null);
+    assertEquals(10, blockList.size());
+  }
+
+  // Close must write data to storage for Block Blobs with compaction
+  @Test
+  public void testBlockBlobCompactionClose() throws IOException {
+    Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR);
+    try (FSDataOutputStream stream = fs.create(path)) {
+      assertTrue(isBlockBlobAppendStreamWrapper(stream));
+      byte[] buffer = getRandomBytes();
+      stream.write(buffer);
+      stream.close();
+      validate(path, buffer, true);
+    }
+  }
+
+  // A small write does not write data to storage for Page Blobs
+  @Test
+  public void testPageBlobSmallWrite() throws IOException {
+    Path path = getBlobPathWithTestName(PAGE_BLOB_DIR);
+    try (FSDataOutputStream stream = fs.create(path)) {
+      assertTrue(isPageBlobStreamWrapper(stream));
+      byte[] buffer = getRandomBytes();
+      stream.write(buffer);
+      validate(path, buffer, false);
+    }
+  }
+
+  // A small write does not write data to storage for Block Blobs
+  @Test
+  public void testBlockBlobSmallWrite() throws IOException {
+    Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR);
+    try (FSDataOutputStream stream = fs.create(path)) {
+      byte[] buffer = getRandomBytes();
+      stream.write(buffer);
+      validate(path, buffer, false);
+    }
+  }
+
+  // A small write does not write data to storage for Block Blobs
+  // with Compaction
+  @Test
+  public void testBlockBlobCompactionSmallWrite() throws IOException {
+    Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR);
+    try (FSDataOutputStream stream = fs.create(path)) {
+      assertTrue(isBlockBlobAppendStreamWrapper(stream));
+      byte[] buffer = getRandomBytes();
+      stream.write(buffer);
+      validate(path, buffer, false);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org