You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/09/11 06:48:43 UTC

[18/50] [abbrv] hadoop git commit: HADOOP-14520. WASB: Block compaction for Azure Block Blobs. Contributed by Georgi Chalakov

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemBlockCompaction.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemBlockCompaction.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemBlockCompaction.java
new file mode 100644
index 0000000..820ce4f
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemBlockCompaction.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import com.microsoft.azure.storage.blob.BlockEntry;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.List;
+
+/**
+ * Test class that runs WASB block compaction process for block blobs.
+ */
+
+public class TestNativeAzureFileSystemBlockCompaction extends AbstractWasbTestBase {
+
+  private static final String TEST_FILE = "/user/active/test.dat";
+  private static final Path TEST_PATH = new Path(TEST_FILE);
+
+  private static final String TEST_FILE_NORMAL = "/user/normal/test.dat";
+  private static final Path TEST_PATH_NORMAL = new Path(TEST_FILE_NORMAL);
+
+  private AzureBlobStorageTestAccount testAccount = null;
+
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    testAccount = createTestAccount();
+    fs = testAccount.getFileSystem();
+    Configuration conf = fs.getConf();
+    conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, true);
+    conf.set(AzureNativeFileSystemStore.KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES, "/user/active");
+    URI uri = fs.getUri();
+    fs.initialize(uri, conf);
+  }
+
+  /*
+   * Helper method that creates test data of size provided by the
+   * "size" parameter.
+   */
+  private static byte[] getTestData(int size) {
+    byte[] testData = new byte[size];
+    System.arraycopy(RandomStringUtils.randomAlphabetic(size).getBytes(), 0, testData, 0, size);
+    return testData;
+  }
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create();
+  }
+
+  private BlockBlobAppendStream getBlockBlobAppendStream(FSDataOutputStream appendStream) {
+    SyncableDataOutputStream dataOutputStream = null;
+
+    if (appendStream.getWrappedStream() instanceof NativeAzureFileSystem.NativeAzureFsOutputStream) {
+      NativeAzureFileSystem.NativeAzureFsOutputStream fsOutputStream =
+              (NativeAzureFileSystem.NativeAzureFsOutputStream) appendStream.getWrappedStream();
+
+      dataOutputStream = (SyncableDataOutputStream) fsOutputStream.getOutStream();
+    }
+
+    if (appendStream.getWrappedStream() instanceof SyncableDataOutputStream) {
+      dataOutputStream = (SyncableDataOutputStream) appendStream.getWrappedStream();
+    }
+
+    Assert.assertNotNull("Did not recognize " + dataOutputStream,
+        dataOutputStream);
+
+    return (BlockBlobAppendStream) dataOutputStream.getOutStream();
+  }
+
+  private void verifyBlockList(BlockBlobAppendStream blockBlobStream,
+                               int[] testData) throws Throwable {
+    List<BlockEntry> blockList = blockBlobStream.getBlockList();
+    Assert.assertEquals("Block list length", testData.length, blockList.size());
+
+    int i = 0;
+    for (BlockEntry block: blockList) {
+      Assert.assertTrue(block.getSize() == testData[i++]);
+    }
+  }
+
+  private void appendBlockList(FSDataOutputStream fsStream,
+                              ByteArrayOutputStream memStream,
+                              int[] testData) throws Throwable {
+
+    for (int d: testData) {
+      byte[] data = getTestData(d);
+      memStream.write(data);
+      fsStream.write(data);
+    }
+    fsStream.hflush();
+  }
+
+  @Test
+  public void testCompactionDisabled() throws Throwable {
+
+    try (FSDataOutputStream appendStream = fs.create(TEST_PATH_NORMAL)) {
+
+      // testing new file
+
+      SyncableDataOutputStream dataOutputStream = null;
+
+      OutputStream wrappedStream = appendStream.getWrappedStream();
+      if (wrappedStream instanceof NativeAzureFileSystem.NativeAzureFsOutputStream) {
+        NativeAzureFileSystem.NativeAzureFsOutputStream fsOutputStream =
+                (NativeAzureFileSystem.NativeAzureFsOutputStream) wrappedStream;
+
+        dataOutputStream = (SyncableDataOutputStream) fsOutputStream.getOutStream();
+      } else if (wrappedStream instanceof SyncableDataOutputStream) {
+        dataOutputStream = (SyncableDataOutputStream) wrappedStream;
+      } else {
+        Assert.fail("Unable to determine type of " + wrappedStream
+            + " class of " + wrappedStream.getClass());
+      }
+
+      Assert.assertFalse("Data output stream is a BlockBlobAppendStream: "
+          + dataOutputStream,
+          dataOutputStream.getOutStream() instanceof BlockBlobAppendStream);
+
+    }
+  }
+
+  @Test
+  public void testCompaction() throws Throwable {
+
+    final int n2 = 2;
+    final int n4 = 4;
+    final int n10 = 10;
+    final int n12 = 12;
+    final int n14 = 14;
+    final int n16 = 16;
+
+    final int maxBlockSize = 16;
+    final int compactionBlockCount = 4;
+
+    ByteArrayOutputStream memStream = new ByteArrayOutputStream();
+
+    try (FSDataOutputStream appendStream = fs.create(TEST_PATH)) {
+
+      // test new file
+
+      BlockBlobAppendStream blockBlobStream = getBlockBlobAppendStream(appendStream);
+      blockBlobStream.setMaxBlockSize(maxBlockSize);
+      blockBlobStream.setCompactionBlockCount(compactionBlockCount);
+
+      appendBlockList(appendStream, memStream, new int[]{n2});
+      verifyBlockList(blockBlobStream, new int[]{n2});
+
+      appendStream.hflush();
+      verifyBlockList(blockBlobStream, new int[]{n2});
+
+      appendBlockList(appendStream, memStream, new int[]{n4});
+      verifyBlockList(blockBlobStream, new int[]{n2, n4});
+
+      appendStream.hsync();
+      verifyBlockList(blockBlobStream, new int[]{n2, n4});
+
+      appendBlockList(appendStream, memStream, new int[]{n4});
+      verifyBlockList(blockBlobStream, new int[]{n2, n4, n4});
+
+      appendBlockList(appendStream, memStream, new int[]{n4});
+      verifyBlockList(blockBlobStream, new int[]{n2, n4, n4, n4});
+
+      appendBlockList(appendStream, memStream, new int[]{n4});
+      verifyBlockList(blockBlobStream, new int[]{n14, n4});
+
+      appendBlockList(appendStream, memStream, new int[]{n4});
+      verifyBlockList(blockBlobStream, new int[]{n14, n4, n4});
+
+      appendBlockList(appendStream, memStream, new int[]{n4});
+      verifyBlockList(blockBlobStream, new int[]{n14, n4, n4, n4});
+
+      appendBlockList(appendStream, memStream, new int[]{n2, n4, n4});
+      verifyBlockList(blockBlobStream, new int[]{n14, n12, n10});
+
+      appendBlockList(appendStream, memStream, new int[]{n4});
+      verifyBlockList(blockBlobStream, new int[]{n14, n12, n10, n4});
+
+      appendBlockList(appendStream, memStream,
+              new int[]{n4, n4, n4, n4});
+      verifyBlockList(blockBlobStream,
+              new int[]{n14, n12, n14, n16});
+
+      appendBlockList(appendStream, memStream,
+              new int[]{n4, n4, n4, n4, n4});
+      verifyBlockList(blockBlobStream,
+              new int[]{n14, n12, n14, n16, n16, n4});
+
+      appendBlockList(appendStream, memStream,
+              new int[]{n4});
+      verifyBlockList(blockBlobStream,
+              new int[]{n14, n12, n14, n16, n16, n4, n4});
+
+      appendBlockList(appendStream, memStream,
+              new int[]{n4});
+      verifyBlockList(blockBlobStream,
+              new int[]{n14, n12, n14, n16, n16, n4, n4, n4});
+
+      appendBlockList(appendStream, memStream,
+              new int[]{n4});
+      verifyBlockList(blockBlobStream,
+              new int[]{n14, n12, n14, n16, n16, n4, n4, n4, n4});
+
+      appendBlockList(appendStream, memStream, new int[]{n4});
+
+      appendStream.close();
+
+      ContractTestUtils.verifyFileContents(fs, TEST_PATH, memStream.toByteArray());
+    }
+
+    try (FSDataOutputStream appendStream = fs.append(TEST_PATH)) {
+
+      // test existing file
+
+      BlockBlobAppendStream blockBlobStream = getBlockBlobAppendStream(appendStream);
+      blockBlobStream.setMaxBlockSize(maxBlockSize);
+      blockBlobStream.setCompactionBlockCount(compactionBlockCount);
+
+      appendBlockList(appendStream, memStream, new int[]{n4});
+      verifyBlockList(blockBlobStream,
+              new int[]{n14, n12, n14, n16, n16, n16, n4, n4});
+
+      appendBlockList(appendStream, memStream, new int[]{n4});
+      verifyBlockList(blockBlobStream,
+              new int[]{n14, n12, n14, n16, n16, n16, n4, n4, n4});
+
+      appendBlockList(appendStream, memStream, new int[]{n4});
+      verifyBlockList(blockBlobStream,
+              new int[]{n14, n12, n14, n16, n16, n16, n4, n4, n4, n4});
+
+      appendBlockList(appendStream, memStream, new int[]{n4});
+      verifyBlockList(blockBlobStream,
+              new int[]{n14, n12, n14, n16, n16, n16, n16, n4});
+
+      appendStream.close();
+
+      ContractTestUtils.verifyFileContents(fs, TEST_PATH, memStream.toByteArray());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties b/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties
index 73ee3f9..a5e0c4f 100644
--- a/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties
+++ b/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties
@@ -23,3 +23,4 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t]: %c{2} (%F:%M(%L)) - %m%n
 
 log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG
+log4j.logger.org.apache.hadoop.fs.azure.BlockBlobAppendStream=DEBUG


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org