You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2017/09/15 16:56:53 UTC
[43/50] [abbrv] hadoop git commit: HADOOP-14553. Add (parallelized)
integration tests to hadoop-azure Contributed by Steve Loughran
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
index 177477c..726b504 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
@@ -18,12 +18,6 @@
package org.apache.hadoop.fs.azure;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
@@ -47,16 +41,18 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
-import org.apache.hadoop.fs.azure.AzureException;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
+import static org.apache.hadoop.test.GenericTestUtils.*;
+
/*
* Tests the Native Azure file system (WASB) against an actual blob store if
* provided in the environment.
@@ -71,15 +67,46 @@ public abstract class NativeAzureFileSystemBaseTest
private final long modifiedTimeErrorMargin = 5 * 1000; // Give it +/-5 seconds
public static final Log LOG = LogFactory.getLog(NativeAzureFileSystemBaseTest.class);
+ protected NativeAzureFileSystem fs;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ fs = getFileSystem();
+ }
+
+ /**
+ * Assert that a path does not exist.
+ *
+ * @param message message to include in the assertion failure message
+ * @param path path in the filesystem
+ * @throws IOException IO problems
+ */
+ public void assertPathDoesNotExist(String message,
+ Path path) throws IOException {
+ ContractTestUtils.assertPathDoesNotExist(fs, message, path);
+ }
+
+ /**
+ * Assert that a path exists.
+ *
+ * @param message message to include in the assertion failure message
+ * @param path path in the filesystem
+ * @throws IOException IO problems
+ */
+ public void assertPathExists(String message,
+ Path path) throws IOException {
+ ContractTestUtils.assertPathExists(fs, message, path);
+ }
@Test
public void testCheckingNonExistentOneLetterFile() throws Exception {
- assertFalse(fs.exists(new Path("/a")));
+ assertPathDoesNotExist("one letter file", new Path("/a"));
}
@Test
public void testStoreRetrieveFile() throws Exception {
- Path testFile = new Path("unit-test-file");
+ Path testFile = methodPath();
writeString(testFile, "Testing");
assertTrue(fs.exists(testFile));
FileStatus status = fs.getFileStatus(testFile);
@@ -93,7 +120,7 @@ public abstract class NativeAzureFileSystemBaseTest
@Test
public void testStoreDeleteFolder() throws Exception {
- Path testFolder = new Path("storeDeleteFolder");
+ Path testFolder = methodPath();
assertFalse(fs.exists(testFolder));
assertTrue(fs.mkdirs(testFolder));
assertTrue(fs.exists(testFolder));
@@ -105,22 +132,22 @@ public abstract class NativeAzureFileSystemBaseTest
assertEquals(new FsPermission((short) 0755), status.getPermission());
Path innerFile = new Path(testFolder, "innerFile");
assertTrue(fs.createNewFile(innerFile));
- assertTrue(fs.exists(innerFile));
+ assertPathExists("inner file", innerFile);
assertTrue(fs.delete(testFolder, true));
- assertFalse(fs.exists(innerFile));
- assertFalse(fs.exists(testFolder));
+ assertPathDoesNotExist("inner file", innerFile);
+ assertPathDoesNotExist("testFolder", testFolder);
}
@Test
public void testFileOwnership() throws Exception {
- Path testFile = new Path("ownershipTestFile");
+ Path testFile = methodPath();
writeString(testFile, "Testing");
testOwnership(testFile);
}
@Test
public void testFolderOwnership() throws Exception {
- Path testFolder = new Path("ownershipTestFolder");
+ Path testFolder = methodPath();
fs.mkdirs(testFolder);
testOwnership(testFolder);
}
@@ -147,7 +174,7 @@ public abstract class NativeAzureFileSystemBaseTest
@Test
public void testFilePermissions() throws Exception {
- Path testFile = new Path("permissionTestFile");
+ Path testFile = methodPath();
FsPermission permission = FsPermission.createImmutable((short) 644);
createEmptyFile(testFile, permission);
FileStatus ret = fs.getFileStatus(testFile);
@@ -157,7 +184,7 @@ public abstract class NativeAzureFileSystemBaseTest
@Test
public void testFolderPermissions() throws Exception {
- Path testFolder = new Path("permissionTestFolder");
+ Path testFolder = methodPath();
FsPermission permission = FsPermission.createImmutable((short) 644);
fs.mkdirs(testFolder, permission);
FileStatus ret = fs.getFileStatus(testFolder);
@@ -176,9 +203,9 @@ public abstract class NativeAzureFileSystemBaseTest
createEmptyFile(testFile, permission);
FsPermission rootPerm = fs.getFileStatus(firstDir.getParent()).getPermission();
FsPermission inheritPerm = FsPermission.createImmutable((short)(rootPerm.toShort() | 0300));
- assertTrue(fs.exists(testFile));
- assertTrue(fs.exists(firstDir));
- assertTrue(fs.exists(middleDir));
+ assertPathExists("test file", testFile);
+ assertPathExists("firstDir", firstDir);
+ assertPathExists("middleDir", middleDir);
// verify that the indirectly created directory inherited its permissions from the root directory
FileStatus directoryStatus = fs.getFileStatus(middleDir);
assertTrue(directoryStatus.isDirectory());
@@ -188,7 +215,7 @@ public abstract class NativeAzureFileSystemBaseTest
assertFalse(fileStatus.isDirectory());
assertEqualsIgnoreStickyBit(umaskedPermission, fileStatus.getPermission());
assertTrue(fs.delete(firstDir, true));
- assertFalse(fs.exists(testFile));
+ assertPathDoesNotExist("deleted file", testFile);
// An alternative test scenario would've been to delete the file first,
// and then check for the existence of the upper folders still. But that
@@ -264,7 +291,7 @@ public abstract class NativeAzureFileSystemBaseTest
assertTrue(fs.delete(new Path("deep"), true));
}
- private static enum RenameFolderVariation {
+ private enum RenameFolderVariation {
CreateFolderAndInnerFile, CreateJustInnerFile, CreateJustFolder
}
@@ -303,10 +330,10 @@ public abstract class NativeAzureFileSystemBaseTest
localFs.delete(localFilePath, true);
try {
writeString(localFs, localFilePath, "Testing");
- Path dstPath = new Path("copiedFromLocal");
+ Path dstPath = methodPath();
assertTrue(FileUtil.copy(localFs, localFilePath, fs, dstPath, false,
fs.getConf()));
- assertTrue(fs.exists(dstPath));
+ assertPathExists("coied from local", dstPath);
assertEquals("Testing", readString(fs, dstPath));
fs.delete(dstPath, true);
} finally {
@@ -423,32 +450,32 @@ public abstract class NativeAzureFileSystemBaseTest
@Test
public void testReadingDirectoryAsFile() throws Exception {
- Path dir = new Path("/x");
+ Path dir = methodPath();
assertTrue(fs.mkdirs(dir));
try {
fs.open(dir).close();
assertTrue("Should've thrown", false);
} catch (FileNotFoundException ex) {
- assertEquals("/x is a directory not a file.", ex.getMessage());
+ assertExceptionContains("a directory not a file.", ex);
}
}
@Test
public void testCreatingFileOverDirectory() throws Exception {
- Path dir = new Path("/x");
+ Path dir = methodPath();
assertTrue(fs.mkdirs(dir));
try {
fs.create(dir).close();
assertTrue("Should've thrown", false);
} catch (IOException ex) {
- assertEquals("Cannot create file /x; already exists as a directory.",
- ex.getMessage());
+ assertExceptionContains("Cannot create file", ex);
+ assertExceptionContains("already exists as a directory", ex);
}
}
@Test
public void testInputStreamReadWithZeroSizeBuffer() throws Exception {
- Path newFile = new Path("zeroSizeRead");
+ Path newFile = methodPath();
OutputStream output = fs.create(newFile);
output.write(10);
output.close();
@@ -460,7 +487,7 @@ public abstract class NativeAzureFileSystemBaseTest
@Test
public void testInputStreamReadWithBufferReturnsMinusOneOnEof() throws Exception {
- Path newFile = new Path("eofRead");
+ Path newFile = methodPath();
OutputStream output = fs.create(newFile);
output.write(10);
output.close();
@@ -482,7 +509,7 @@ public abstract class NativeAzureFileSystemBaseTest
@Test
public void testInputStreamReadWithBufferReturnsMinusOneOnEofForLargeBuffer() throws Exception {
- Path newFile = new Path("eofRead2");
+ Path newFile = methodPath();
OutputStream output = fs.create(newFile);
byte[] outputBuff = new byte[97331];
for(int i = 0; i < outputBuff.length; ++i) {
@@ -508,7 +535,7 @@ public abstract class NativeAzureFileSystemBaseTest
@Test
public void testInputStreamReadIntReturnsMinusOneOnEof() throws Exception {
- Path newFile = new Path("eofRead3");
+ Path newFile = methodPath();
OutputStream output = fs.create(newFile);
output.write(10);
output.close();
@@ -525,7 +552,7 @@ public abstract class NativeAzureFileSystemBaseTest
@Test
public void testSetPermissionOnFile() throws Exception {
- Path newFile = new Path("testPermission");
+ Path newFile = methodPath();
OutputStream output = fs.create(newFile);
output.write(13);
output.close();
@@ -540,14 +567,14 @@ public abstract class NativeAzureFileSystemBaseTest
// Don't check the file length for page blobs. Only block blobs
// provide the actual length of bytes written.
- if (!(this instanceof TestNativeAzureFSPageBlobLive)) {
+ if (!(this instanceof ITestNativeAzureFSPageBlobLive)) {
assertEquals(1, newStatus.getLen());
}
}
@Test
public void testSetPermissionOnFolder() throws Exception {
- Path newFolder = new Path("testPermission");
+ Path newFolder = methodPath();
assertTrue(fs.mkdirs(newFolder));
FsPermission newPermission = new FsPermission((short) 0600);
fs.setPermission(newFolder, newPermission);
@@ -559,7 +586,7 @@ public abstract class NativeAzureFileSystemBaseTest
@Test
public void testSetOwnerOnFile() throws Exception {
- Path newFile = new Path("testOwner");
+ Path newFile = methodPath();
OutputStream output = fs.create(newFile);
output.write(13);
output.close();
@@ -571,7 +598,7 @@ public abstract class NativeAzureFileSystemBaseTest
// File length is only reported to be the size of bytes written to the file for block blobs.
// So only check it for block blobs, not page blobs.
- if (!(this instanceof TestNativeAzureFSPageBlobLive)) {
+ if (!(this instanceof ITestNativeAzureFSPageBlobLive)) {
assertEquals(1, newStatus.getLen());
}
fs.setOwner(newFile, null, "newGroup");
@@ -583,7 +610,7 @@ public abstract class NativeAzureFileSystemBaseTest
@Test
public void testSetOwnerOnFolder() throws Exception {
- Path newFolder = new Path("testOwner");
+ Path newFolder = methodPath();
assertTrue(fs.mkdirs(newFolder));
fs.setOwner(newFolder, "newUser", null);
FileStatus newStatus = fs.getFileStatus(newFolder);
@@ -594,21 +621,21 @@ public abstract class NativeAzureFileSystemBaseTest
@Test
public void testModifiedTimeForFile() throws Exception {
- Path testFile = new Path("testFile");
+ Path testFile = methodPath();
fs.create(testFile).close();
testModifiedTime(testFile);
}
@Test
public void testModifiedTimeForFolder() throws Exception {
- Path testFolder = new Path("testFolder");
+ Path testFolder = methodPath();
assertTrue(fs.mkdirs(testFolder));
testModifiedTime(testFolder);
}
@Test
public void testFolderLastModifiedTime() throws Exception {
- Path parentFolder = new Path("testFolder");
+ Path parentFolder = methodPath();
Path innerFile = new Path(parentFolder, "innerfile");
assertTrue(fs.mkdirs(parentFolder));
@@ -983,7 +1010,7 @@ public abstract class NativeAzureFileSystemBaseTest
// Make sure rename pending file is gone.
FileStatus[] listed = fs.listStatus(new Path("/"));
- assertEquals(1, listed.length);
+ assertEquals("Pending directory still found", 1, listed.length);
assertTrue(listed[0].isDirectory());
}
@@ -1681,7 +1708,7 @@ public abstract class NativeAzureFileSystemBaseTest
assertTrue("Unanticipated exception", false);
}
} else {
- assertTrue("Unknown thread name", false);
+ fail("Unknown thread name");
}
LOG.info(name + " is exiting.");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt
deleted file mode 100644
index 54ba4d8..0000000
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt
+++ /dev/null
@@ -1,22 +0,0 @@
-========================================================================
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-=========================================================================
-
-In order to run Windows Azure Storage Blob (WASB) unit tests against a live
-Azure Storage account, you need to provide test account details in a configuration
-file called azure-test.xml. See hadoop-tools/hadoop-azure/README.txt for details
-on configuration, and how to run the tests.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
deleted file mode 100644
index a10a366..0000000
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azure;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeNotNull;
-
-import java.io.*;
-import java.util.Arrays;
-
-import org.apache.hadoop.fs.azure.AzureException;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestAzureConcurrentOutOfBandIo {
-
- // Class constants.
- static final int DOWNLOAD_BLOCK_SIZE = 8 * 1024 * 1024;
- static final int UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
- static final int BLOB_SIZE = 32 * 1024 * 1024;
-
- // Number of blocks to be written before flush.
- static final int NUMBER_OF_BLOCKS = 2;
-
- protected AzureBlobStorageTestAccount testAccount;
-
- // Overridden TestCase methods.
- @Before
- public void setUp() throws Exception {
- testAccount = AzureBlobStorageTestAccount.createOutOfBandStore(
- UPLOAD_BLOCK_SIZE, DOWNLOAD_BLOCK_SIZE);
- assumeNotNull(testAccount);
- }
-
- @After
- public void tearDown() throws Exception {
- if (testAccount != null) {
- testAccount.cleanup();
- testAccount = null;
- }
- }
-
- class DataBlockWriter implements Runnable {
-
- Thread runner;
- AzureBlobStorageTestAccount writerStorageAccount;
- String key;
- boolean done = false;
-
- /**
- * Constructor captures the test account.
- *
- * @param testAccount
- */
- public DataBlockWriter(AzureBlobStorageTestAccount testAccount, String key) {
- writerStorageAccount = testAccount;
- this.key = key;
- }
-
- /**
- * Start writing blocks to Azure storage.
- */
- public void startWriting() {
- runner = new Thread(this); // Create the block writer thread.
- runner.start(); // Start the block writer thread.
- }
-
- /**
- * Stop writing blocks to Azure storage.
- */
- public void stopWriting() {
- done = true;
- }
-
- /**
- * Implementation of the runnable interface. The run method is a tight loop
- * which repeatedly updates the blob with a 4 MB block.
- */
- public void run() {
- byte[] dataBlockWrite = new byte[UPLOAD_BLOCK_SIZE];
-
- OutputStream outputStream = null;
-
- try {
- for (int i = 0; !done; i++) {
- // Write two 4 MB blocks to the blob.
- //
- outputStream = writerStorageAccount.getStore().storefile(
- key,
- new PermissionStatus("", "", FsPermission.getDefault()),
- key);
-
- Arrays.fill(dataBlockWrite, (byte) (i % 256));
- for (int j = 0; j < NUMBER_OF_BLOCKS; j++) {
- outputStream.write(dataBlockWrite);
- }
-
- outputStream.flush();
- outputStream.close();
- }
- } catch (AzureException e) {
- System.out
- .println("DatablockWriter thread encountered a storage exception."
- + e.getMessage());
- } catch (IOException e) {
- System.out
- .println("DatablockWriter thread encountered an I/O exception."
- + e.getMessage());
- }
- }
- }
-
- @Test
- public void testReadOOBWrites() throws Exception {
-
- byte[] dataBlockWrite = new byte[UPLOAD_BLOCK_SIZE];
- byte[] dataBlockRead = new byte[UPLOAD_BLOCK_SIZE];
-
- // Write to blob to make sure it exists.
- //
- // Write five 4 MB blocks to the blob. To ensure there is data in the blob before
- // reading. This eliminates the race between the reader and writer threads.
- OutputStream outputStream = testAccount.getStore().storefile(
- "WASB_String.txt",
- new PermissionStatus("", "", FsPermission.getDefault()),
- "WASB_String.txt");
- Arrays.fill(dataBlockWrite, (byte) 255);
- for (int i = 0; i < NUMBER_OF_BLOCKS; i++) {
- outputStream.write(dataBlockWrite);
- }
-
- outputStream.flush();
- outputStream.close();
-
- // Start writing blocks to Azure store using the DataBlockWriter thread.
- DataBlockWriter writeBlockTask = new DataBlockWriter(testAccount,
- "WASB_String.txt");
- writeBlockTask.startWriting();
- int count = 0;
- InputStream inputStream = null;
-
- for (int i = 0; i < 5; i++) {
- try {
- inputStream = testAccount.getStore().retrieve("WASB_String.txt");
- count = 0;
- int c = 0;
-
- while (c >= 0) {
- c = inputStream.read(dataBlockRead, 0, UPLOAD_BLOCK_SIZE);
- if (c < 0) {
- break;
- }
-
- // Counting the number of bytes.
- count += c;
- }
- } catch (IOException e) {
- System.out.println(e.getCause().toString());
- e.printStackTrace();
- fail();
- }
-
- // Close the stream.
- if (null != inputStream){
- inputStream.close();
- }
- }
-
- // Stop writing blocks.
- writeBlockTask.stopWriting();
-
- // Validate that a block was read.
- assertEquals(NUMBER_OF_BLOCKS * UPLOAD_BLOCK_SIZE, count);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIoWithSecureMode.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIoWithSecureMode.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIoWithSecureMode.java
deleted file mode 100644
index 687b785..0000000
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIoWithSecureMode.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azure;
-
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeNotNull;
-
-/**
- * Extends TestAzureConcurrentOutOfBandIo in order to run testReadOOBWrites with secure mode
- * (fs.azure.secure.mode) both enabled and disabled.
- */
-public class TestAzureConcurrentOutOfBandIoWithSecureMode extends TestAzureConcurrentOutOfBandIo {
-
- // Overridden TestCase methods.
- @Before
- @Override
- public void setUp() throws Exception {
- testAccount = AzureBlobStorageTestAccount.createOutOfBandStore(
- UPLOAD_BLOCK_SIZE, DOWNLOAD_BLOCK_SIZE, true);
- assumeNotNull(testAccount);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
deleted file mode 100644
index c985224..0000000
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azure;
-
-import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.NO_ACCESS_TO_CONTAINER_MSG;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeNotNull;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.HashMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext;
-import org.apache.hadoop.test.GenericTestUtils;
-
-import org.junit.Test;
-
-import com.microsoft.azure.storage.OperationContext;
-import com.microsoft.azure.storage.SendingRequestEvent;
-import com.microsoft.azure.storage.StorageEvent;
-
-public class TestAzureFileSystemErrorConditions {
- private static final int ALL_THREE_FILE_SIZE = 1024;
-
- @Test
- public void testNoInitialize() throws Exception {
- AzureNativeFileSystemStore store = new AzureNativeFileSystemStore();
- boolean passed = false;
- try {
- store.retrieveMetadata("foo");
- passed = true;
- } catch (AssertionError e) {
- }
- assertFalse(
- "Doing an operation on the store should throw if not initalized.",
- passed);
- }
-
- /**
- * Try accessing an unauthorized or non-existent (treated the same) container
- * from WASB.
- */
- @Test
- public void testAccessUnauthorizedPublicContainer() throws Exception {
- final String container = "nonExistentContainer";
- final String account = "hopefullyNonExistentAccount";
- Path noAccessPath = new Path(
- "wasb://" + container + "@" + account + "/someFile");
- NativeAzureFileSystem.suppressRetryPolicy();
- try {
- FileSystem.get(noAccessPath.toUri(), new Configuration())
- .open(noAccessPath);
- assertTrue("Should've thrown.", false);
- } catch (AzureException ex) {
- GenericTestUtils.assertExceptionContains(
- String.format(NO_ACCESS_TO_CONTAINER_MSG, account, container), ex);
- } finally {
- NativeAzureFileSystem.resumeRetryPolicy();
- }
- }
-
- @Test
- public void testAccessContainerWithWrongVersion() throws Exception {
- AzureNativeFileSystemStore store = new AzureNativeFileSystemStore();
- MockStorageInterface mockStorage = new MockStorageInterface();
- store.setAzureStorageInteractionLayer(mockStorage);
- FileSystem fs = new NativeAzureFileSystem(store);
- try {
- Configuration conf = new Configuration();
- AzureBlobStorageTestAccount.setMockAccountKey(conf);
- HashMap<String, String> metadata = new HashMap<String, String>();
- metadata.put(AzureNativeFileSystemStore.VERSION_METADATA_KEY,
- "2090-04-05"); // It's from the future!
- mockStorage.addPreExistingContainer(
- AzureBlobStorageTestAccount.getMockContainerUri(), metadata);
-
- boolean passed = false;
- try {
- fs.initialize(new URI(AzureBlobStorageTestAccount.MOCK_WASB_URI), conf);
- fs.listStatus(new Path("/"));
- passed = true;
- } catch (AzureException ex) {
- assertTrue("Unexpected exception message: " + ex,
- ex.getMessage().contains("unsupported version: 2090-04-05."));
- }
- assertFalse("Should've thrown an exception because of the wrong version.",
- passed);
- } finally {
- fs.close();
- }
- }
-
- private interface ConnectionRecognizer {
- boolean isTargetConnection(HttpURLConnection connection);
- }
-
- private class TransientErrorInjector extends StorageEvent<SendingRequestEvent> {
- final ConnectionRecognizer connectionRecognizer;
- private boolean injectedErrorOnce = false;
-
- public TransientErrorInjector(ConnectionRecognizer connectionRecognizer) {
- this.connectionRecognizer = connectionRecognizer;
- }
-
- @Override
- public void eventOccurred(SendingRequestEvent eventArg) {
- HttpURLConnection connection = (HttpURLConnection)eventArg.getConnectionObject();
- if (!connectionRecognizer.isTargetConnection(connection)) {
- return;
- }
- if (!injectedErrorOnce) {
- connection.setReadTimeout(1);
- connection.disconnect();
- injectedErrorOnce = true;
- }
- }
- }
-
- private void injectTransientError(NativeAzureFileSystem fs,
- final ConnectionRecognizer connectionRecognizer) {
- fs.getStore().addTestHookToOperationContext(new TestHookOperationContext() {
- @Override
- public OperationContext modifyOperationContext(OperationContext original) {
- original.getSendingRequestEventHandler().addListener(
- new TransientErrorInjector(connectionRecognizer));
- return original;
- }
- });
- }
-
- @Test
- public void testTransientErrorOnDelete() throws Exception {
- // Need to do this test against a live storage account
- AzureBlobStorageTestAccount testAccount =
- AzureBlobStorageTestAccount.create();
- assumeNotNull(testAccount);
- try {
- NativeAzureFileSystem fs = testAccount.getFileSystem();
- injectTransientError(fs, new ConnectionRecognizer() {
- @Override
- public boolean isTargetConnection(HttpURLConnection connection) {
- return connection.getRequestMethod().equals("DELETE");
- }
- });
- Path testFile = new Path("/a/b");
- assertTrue(fs.createNewFile(testFile));
- assertTrue(fs.rename(testFile, new Path("/x")));
- } finally {
- testAccount.cleanup();
- }
- }
-
- private void writeAllThreeFile(NativeAzureFileSystem fs, Path testFile)
- throws IOException {
- byte[] buffer = new byte[ALL_THREE_FILE_SIZE];
- Arrays.fill(buffer, (byte)3);
- OutputStream stream = fs.create(testFile);
- stream.write(buffer);
- stream.close();
- }
-
- private void readAllThreeFile(NativeAzureFileSystem fs, Path testFile)
- throws IOException {
- byte[] buffer = new byte[ALL_THREE_FILE_SIZE];
- InputStream inStream = fs.open(testFile);
- assertEquals(buffer.length,
- inStream.read(buffer, 0, buffer.length));
- inStream.close();
- for (int i = 0; i < buffer.length; i++) {
- assertEquals(3, buffer[i]);
- }
- }
-
- @Test
- public void testTransientErrorOnCommitBlockList() throws Exception {
- // Need to do this test against a live storage account
- AzureBlobStorageTestAccount testAccount =
- AzureBlobStorageTestAccount.create();
- assumeNotNull(testAccount);
- try {
- NativeAzureFileSystem fs = testAccount.getFileSystem();
- injectTransientError(fs, new ConnectionRecognizer() {
- @Override
- public boolean isTargetConnection(HttpURLConnection connection) {
- return connection.getRequestMethod().equals("PUT")
- && connection.getURL().getQuery() != null
- && connection.getURL().getQuery().contains("blocklist");
- }
- });
- Path testFile = new Path("/a/b");
- writeAllThreeFile(fs, testFile);
- readAllThreeFile(fs, testFile);
- } finally {
- testAccount.cleanup();
- }
- }
-
- @Test
- public void testTransientErrorOnRead() throws Exception {
- // Need to do this test against a live storage account
- AzureBlobStorageTestAccount testAccount =
- AzureBlobStorageTestAccount.create();
- assumeNotNull(testAccount);
- try {
- NativeAzureFileSystem fs = testAccount.getFileSystem();
- Path testFile = new Path("/a/b");
- writeAllThreeFile(fs, testFile);
- injectTransientError(fs, new ConnectionRecognizer() {
- @Override
- public boolean isTargetConnection(HttpURLConnection connection) {
- return connection.getRequestMethod().equals("GET");
- }
- });
- readAllThreeFile(fs, testFile);
- } finally {
- testAccount.cleanup();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java
deleted file mode 100644
index ea17b62..0000000
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azure;
-
-import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_CHECK_BLOCK_MD5;
-import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_STORE_BLOB_MD5;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeNotNull;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.util.Arrays;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext;
-import org.junit.After;
-import org.junit.Test;
-
-import com.microsoft.azure.storage.Constants;
-import com.microsoft.azure.storage.OperationContext;
-import com.microsoft.azure.storage.ResponseReceivedEvent;
-import com.microsoft.azure.storage.StorageErrorCodeStrings;
-import com.microsoft.azure.storage.StorageEvent;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.BlockEntry;
-import com.microsoft.azure.storage.blob.BlockSearchMode;
-import com.microsoft.azure.storage.blob.CloudBlockBlob;
-import com.microsoft.azure.storage.core.Base64;
-
-/**
- * Test that we do proper data integrity validation with MD5 checks as
- * configured.
- */
-public class TestBlobDataValidation {
- private AzureBlobStorageTestAccount testAccount;
-
- @After
- public void tearDown() throws Exception {
- if (testAccount != null) {
- testAccount.cleanup();
- testAccount = null;
- }
- }
-
- /**
- * Test that by default we don't store the blob-level MD5.
- */
- @Test
- public void testBlobMd5StoreOffByDefault() throws Exception {
- testAccount = AzureBlobStorageTestAccount.create();
- testStoreBlobMd5(false);
- }
-
- /**
- * Test that we get blob-level MD5 storage and validation if we specify that
- * in the configuration.
- */
- @Test
- public void testStoreBlobMd5() throws Exception {
- Configuration conf = new Configuration();
- conf.setBoolean(KEY_STORE_BLOB_MD5, true);
- testAccount = AzureBlobStorageTestAccount.create(conf);
- testStoreBlobMd5(true);
- }
-
- private void testStoreBlobMd5(boolean expectMd5Stored) throws Exception {
- assumeNotNull(testAccount);
- // Write a test file.
- String testFileKey = "testFile";
- Path testFilePath = new Path("/" + testFileKey);
- OutputStream outStream = testAccount.getFileSystem().create(testFilePath);
- outStream.write(new byte[] { 5, 15 });
- outStream.close();
-
- // Check that we stored/didn't store the MD5 field as configured.
- CloudBlockBlob blob = testAccount.getBlobReference(testFileKey);
- blob.downloadAttributes();
- String obtainedMd5 = blob.getProperties().getContentMD5();
- if (expectMd5Stored) {
- assertNotNull(obtainedMd5);
- } else {
- assertNull("Expected no MD5, found: " + obtainedMd5, obtainedMd5);
- }
-
- // Mess with the content so it doesn't match the MD5.
- String newBlockId = Base64.encode(new byte[] { 55, 44, 33, 22 });
- blob.uploadBlock(newBlockId,
- new ByteArrayInputStream(new byte[] { 6, 45 }), 2);
- blob.commitBlockList(Arrays.asList(new BlockEntry[] { new BlockEntry(
- newBlockId, BlockSearchMode.UNCOMMITTED) }));
-
- // Now read back the content. If we stored the MD5 for the blob content
- // we should get a data corruption error.
- InputStream inStream = testAccount.getFileSystem().open(testFilePath);
- try {
- byte[] inBuf = new byte[100];
- while (inStream.read(inBuf) > 0){
- //nothing;
- }
- inStream.close();
- if (expectMd5Stored) {
- fail("Should've thrown because of data corruption.");
- }
- } catch (IOException ex) {
- if (!expectMd5Stored) {
- throw ex;
- }
- StorageException cause = (StorageException)ex.getCause();
- assertNotNull(cause);
- assertEquals("Unexpected cause: " + cause,
- StorageErrorCodeStrings.INVALID_MD5, cause.getErrorCode());
- }
- }
-
- /**
- * Test that by default we check block-level MD5.
- */
- @Test
- public void testCheckBlockMd5() throws Exception {
- testAccount = AzureBlobStorageTestAccount.create();
- testCheckBlockMd5(true);
- }
-
- /**
- * Test that we don't check block-level MD5 if we specify that in the
- * configuration.
- */
- @Test
- public void testDontCheckBlockMd5() throws Exception {
- Configuration conf = new Configuration();
- conf.setBoolean(KEY_CHECK_BLOCK_MD5, false);
- testAccount = AzureBlobStorageTestAccount.create(conf);
- testCheckBlockMd5(false);
- }
-
- /**
- * Connection inspector to check that MD5 fields for content is set/not set as
- * expected.
- */
- private static class ContentMD5Checker extends
- StorageEvent<ResponseReceivedEvent> {
- private final boolean expectMd5;
-
- public ContentMD5Checker(boolean expectMd5) {
- this.expectMd5 = expectMd5;
- }
-
- @Override
- public void eventOccurred(ResponseReceivedEvent eventArg) {
- HttpURLConnection connection = (HttpURLConnection) eventArg
- .getConnectionObject();
- if (isGetRange(connection)) {
- checkObtainedMd5(connection
- .getHeaderField(Constants.HeaderConstants.CONTENT_MD5));
- } else if (isPutBlock(connection)) {
- checkObtainedMd5(connection
- .getRequestProperty(Constants.HeaderConstants.CONTENT_MD5));
- }
- }
-
- private void checkObtainedMd5(String obtainedMd5) {
- if (expectMd5) {
- assertNotNull(obtainedMd5);
- } else {
- assertNull("Expected no MD5, found: " + obtainedMd5, obtainedMd5);
- }
- }
-
- private static boolean isPutBlock(HttpURLConnection connection) {
- return connection.getRequestMethod().equals("PUT")
- && connection.getURL().getQuery() != null
- && connection.getURL().getQuery().contains("blockid");
- }
-
- private static boolean isGetRange(HttpURLConnection connection) {
- return connection.getRequestMethod().equals("GET")
- && connection
- .getHeaderField(Constants.HeaderConstants.STORAGE_RANGE_HEADER) != null;
- }
- }
-
- private void testCheckBlockMd5(final boolean expectMd5Checked)
- throws Exception {
- assumeNotNull(testAccount);
- Path testFilePath = new Path("/testFile");
-
- // Add a hook to check that for GET/PUT requests we set/don't set
- // the block-level MD5 field as configured. I tried to do clever
- // testing by also messing with the raw data to see if we actually
- // validate the data as expected, but the HttpURLConnection wasn't
- // pluggable enough for me to do that.
- testAccount.getFileSystem().getStore()
- .addTestHookToOperationContext(new TestHookOperationContext() {
- @Override
- public OperationContext modifyOperationContext(
- OperationContext original) {
- original.getResponseReceivedEventHandler().addListener(
- new ContentMD5Checker(expectMd5Checked));
- return original;
- }
- });
-
- OutputStream outStream = testAccount.getFileSystem().create(testFilePath);
- outStream.write(new byte[] { 5, 15 });
- outStream.close();
-
- InputStream inStream = testAccount.getFileSystem().open(testFilePath);
- byte[] inBuf = new byte[100];
- while (inStream.read(inBuf) > 0){
- //nothing;
- }
- inStream.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java
index 6c49926..30c1028 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java
@@ -18,11 +18,6 @@
package org.apache.hadoop.fs.azure;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
@@ -42,7 +37,7 @@ import org.junit.Test;
/**
* Tests that we put the correct metadata on blobs created through WASB.
*/
-public class TestBlobMetadata {
+public class TestBlobMetadata extends AbstractWasbTestWithTimeout {
private AzureBlobStorageTestAccount testAccount;
private FileSystem fs;
private InMemoryBlockBlobStore backingStore;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java
index 07d4ebc..aca5f81 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java
@@ -33,9 +33,6 @@ import org.junit.Test;
import java.net.HttpURLConnection;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertEquals;
-
/**
* Tests for <code>BlobOperationDescriptor</code>.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java
deleted file mode 100644
index afb16ef..0000000
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azure;
-
-import java.io.*;
-import java.util.*;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
-
-import junit.framework.*;
-
-import org.junit.Test;
-
-
-/**
- * A simple benchmark to find out the difference in speed between block
- * and page blobs.
- */
-public class TestBlobTypeSpeedDifference extends TestCase {
- /**
- * Writes data to the given stream of the given size, flushing every
- * x bytes.
- */
- private static void writeTestFile(OutputStream writeStream,
- long size, long flushInterval) throws IOException {
- int bufferSize = (int) Math.min(1000, flushInterval);
- byte[] buffer = new byte[bufferSize];
- Arrays.fill(buffer, (byte) 7);
- int bytesWritten = 0;
- int bytesUnflushed = 0;
- while (bytesWritten < size) {
- int numberToWrite = (int) Math.min(bufferSize, size - bytesWritten);
- writeStream.write(buffer, 0, numberToWrite);
- bytesWritten += numberToWrite;
- bytesUnflushed += numberToWrite;
- if (bytesUnflushed >= flushInterval) {
- writeStream.flush();
- bytesUnflushed = 0;
- }
- }
- }
-
- private static class TestResult {
- final long timeTakenInMs;
- final long totalNumberOfRequests;
-
- TestResult(long timeTakenInMs, long totalNumberOfRequests) {
- this.timeTakenInMs = timeTakenInMs;
- this.totalNumberOfRequests = totalNumberOfRequests;
- }
- }
-
- /**
- * Writes data to the given file of the given size, flushing every
- * x bytes. Measure performance of that and return it.
- */
- private static TestResult writeTestFile(NativeAzureFileSystem fs, Path path,
- long size, long flushInterval) throws IOException {
- AzureFileSystemInstrumentation instrumentation =
- fs.getInstrumentation();
- long initialRequests = instrumentation.getCurrentWebResponses();
- Date start = new Date();
- OutputStream output = fs.create(path);
- writeTestFile(output, size, flushInterval);
- output.close();
- long finalRequests = instrumentation.getCurrentWebResponses();
- return new TestResult(new Date().getTime() - start.getTime(),
- finalRequests - initialRequests);
- }
-
- /**
- * Writes data to a block blob of the given size, flushing every
- * x bytes. Measure performance of that and return it.
- */
- private static TestResult writeBlockBlobTestFile(NativeAzureFileSystem fs,
- long size, long flushInterval) throws IOException {
- return writeTestFile(fs, new Path("/blockBlob"), size, flushInterval);
- }
-
- /**
- * Writes data to a page blob of the given size, flushing every
- * x bytes. Measure performance of that and return it.
- */
- private static TestResult writePageBlobTestFile(NativeAzureFileSystem fs,
- long size, long flushInterval) throws IOException {
- return writeTestFile(fs,
- AzureBlobStorageTestAccount.pageBlobPath("pageBlob"),
- size, flushInterval);
- }
-
- /**
- * Runs the benchmark over a small 10 KB file, flushing every 500 bytes.
- */
- @Test
- public void testTenKbFileFrequentFlush() throws Exception {
- AzureBlobStorageTestAccount testAccount =
- AzureBlobStorageTestAccount.create();
- if (testAccount == null) {
- return;
- }
- try {
- testForSizeAndFlushInterval(testAccount.getFileSystem(), 10 * 1000, 500);
- } finally {
- testAccount.cleanup();
- }
- }
-
- /**
- * Runs the benchmark for the given file size and flush frequency.
- */
- private static void testForSizeAndFlushInterval(NativeAzureFileSystem fs,
- final long size, final long flushInterval) throws IOException {
- for (int i = 0; i < 5; i++) {
- TestResult pageBlobResults = writePageBlobTestFile(fs, size, flushInterval);
- System.out.printf(
- "Page blob upload took %d ms. Total number of requests: %d.\n",
- pageBlobResults.timeTakenInMs, pageBlobResults.totalNumberOfRequests);
- TestResult blockBlobResults = writeBlockBlobTestFile(fs, size, flushInterval);
- System.out.printf(
- "Block blob upload took %d ms. Total number of requests: %d.\n",
- blockBlobResults.timeTakenInMs, blockBlobResults.totalNumberOfRequests);
- }
- }
-
- /**
- * Runs the benchmark for the given file size and flush frequency from the
- * command line.
- */
- public static void main(String argv[]) throws Exception {
- Configuration conf = new Configuration();
- long size = 10 * 1000 * 1000;
- long flushInterval = 2000;
- if (argv.length > 0) {
- size = Long.parseLong(argv[0]);
- }
- if (argv.length > 1) {
- flushInterval = Long.parseLong(argv[1]);
- }
- testForSizeAndFlushInterval((NativeAzureFileSystem)FileSystem.get(conf),
- size, flushInterval);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java
deleted file mode 100644
index 0ae4012..0000000
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java
+++ /dev/null
@@ -1,875 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azure;
-
-import java.io.EOFException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Random;
-import java.util.concurrent.Callable;
-
-import org.junit.FixMethodOrder;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-import org.junit.runners.MethodSorters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
-import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeNotNull;
-
-import static org.apache.hadoop.test.LambdaTestUtils.*;
-
-/**
- * Test semantics and performance of the original block blob input stream
- * (KEY_INPUT_STREAM_VERSION=1) and the new
- * <code>BlockBlobInputStream</code> (KEY_INPUT_STREAM_VERSION=2).
- */
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
-
-public class TestBlockBlobInputStream extends AbstractWasbTestBase {
- private static final Logger LOG = LoggerFactory.getLogger(
- TestBlockBlobInputStream.class);
- private static final int KILOBYTE = 1024;
- private static final int MEGABYTE = KILOBYTE * KILOBYTE;
- private static final int TEST_FILE_SIZE = 6 * MEGABYTE;
- private static final Path TEST_FILE_PATH = new Path(
- "TestBlockBlobInputStream.txt");
-
- private AzureBlobStorageTestAccount accountUsingInputStreamV1;
- private AzureBlobStorageTestAccount accountUsingInputStreamV2;
- private long testFileLength;
-
- /**
- * Long test timeout.
- */
- @Rule
- public Timeout testTimeout = new Timeout(10 * 60 * 1000);
- private FileStatus testFileStatus;
- private Path hugefile;
-
- @Override
- public void setUp() throws Exception {
- super.setUp();
- Configuration conf = new Configuration();
- conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1);
-
- accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create(
- "testblockblobinputstream",
- EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
- conf,
- true);
-
- accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create(
- "testblockblobinputstream",
- EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class),
- null,
- true);
-
- assumeNotNull(accountUsingInputStreamV1);
- assumeNotNull(accountUsingInputStreamV2);
- hugefile = fs.makeQualified(TEST_FILE_PATH);
- try {
- testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
- testFileLength = testFileStatus.getLen();
- } catch (FileNotFoundException e) {
- // file doesn't exist
- testFileLength = 0;
- }
- }
-
- @Override
- protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
- Configuration conf = new Configuration();
- conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1);
-
- accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create(
- "testblockblobinputstream",
- EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
- conf,
- true);
-
- accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create(
- "testblockblobinputstream",
- EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class),
- null,
- true);
-
- assumeNotNull(accountUsingInputStreamV1);
- assumeNotNull(accountUsingInputStreamV2);
- return accountUsingInputStreamV1;
- }
-
- /**
- * Create a test file by repeating the characters in the alphabet.
- * @throws IOException
- */
- private void createTestFileAndSetLength() throws IOException {
- FileSystem fs = accountUsingInputStreamV1.getFileSystem();
-
- // To reduce test run time, the test file can be reused.
- if (fs.exists(TEST_FILE_PATH)) {
- testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
- testFileLength = testFileStatus.getLen();
- LOG.info("Reusing test file: {}", testFileStatus);
- return;
- }
-
- int sizeOfAlphabet = ('z' - 'a' + 1);
- byte[] buffer = new byte[26 * KILOBYTE];
- char character = 'a';
- for (int i = 0; i < buffer.length; i++) {
- buffer[i] = (byte) character;
- character = (character == 'z') ? 'a' : (char) ((int) character + 1);
- }
-
- LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH,
- TEST_FILE_SIZE);
- ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
-
- try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
- int bytesWritten = 0;
- while (bytesWritten < TEST_FILE_SIZE) {
- outputStream.write(buffer);
- bytesWritten += buffer.length;
- }
- LOG.info("Closing stream {}", outputStream);
- ContractTestUtils.NanoTimer closeTimer
- = new ContractTestUtils.NanoTimer();
- outputStream.close();
- closeTimer.end("time to close() output stream");
- }
- timer.end("time to write %d KB", TEST_FILE_SIZE / 1024);
- testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen();
- }
-
- void assumeHugeFileExists() throws IOException {
- ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
- FileStatus status = fs.getFileStatus(hugefile);
- ContractTestUtils.assertIsFile(hugefile, status);
- assertTrue("File " + hugefile + " is empty", status.getLen() > 0);
- }
-
- /**
- * Calculate megabits per second from the specified values for bytes and
- * milliseconds.
- * @param bytes The number of bytes.
- * @param milliseconds The number of milliseconds.
- * @return The number of megabits per second.
- */
- private static double toMbps(long bytes, long milliseconds) {
- return bytes / 1000.0 * 8 / milliseconds;
- }
-
- @Test
- public void test_0100_CreateHugeFile() throws IOException {
- createTestFileAndSetLength();
- }
-
- @Test
- public void test_0200_BasicReadTest() throws Exception {
- assumeHugeFileExists();
-
- try (
- FSDataInputStream inputStreamV1
- = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
-
- FSDataInputStream inputStreamV2
- = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
- ) {
- byte[] bufferV1 = new byte[3 * MEGABYTE];
- byte[] bufferV2 = new byte[bufferV1.length];
-
- // v1 forward seek and read a kilobyte into first kilobyte of bufferV1
- inputStreamV1.seek(5 * MEGABYTE);
- int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, KILOBYTE);
- assertEquals(KILOBYTE, numBytesReadV1);
-
- // v2 forward seek and read a kilobyte into first kilobyte of bufferV2
- inputStreamV2.seek(5 * MEGABYTE);
- int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, KILOBYTE);
- assertEquals(KILOBYTE, numBytesReadV2);
-
- assertArrayEquals(bufferV1, bufferV2);
-
- int len = MEGABYTE;
- int offset = bufferV1.length - len;
-
- // v1 reverse seek and read a megabyte into last megabyte of bufferV1
- inputStreamV1.seek(3 * MEGABYTE);
- numBytesReadV1 = inputStreamV1.read(bufferV1, offset, len);
- assertEquals(len, numBytesReadV1);
-
- // v2 reverse seek and read a megabyte into last megabyte of bufferV2
- inputStreamV2.seek(3 * MEGABYTE);
- numBytesReadV2 = inputStreamV2.read(bufferV2, offset, len);
- assertEquals(len, numBytesReadV2);
-
- assertArrayEquals(bufferV1, bufferV2);
- }
- }
-
- @Test
- public void test_0201_RandomReadTest() throws Exception {
- assumeHugeFileExists();
-
- try (
- FSDataInputStream inputStreamV1
- = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
-
- FSDataInputStream inputStreamV2
- = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
- ) {
- final int bufferSize = 4 * KILOBYTE;
- byte[] bufferV1 = new byte[bufferSize];
- byte[] bufferV2 = new byte[bufferV1.length];
-
- verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
-
- inputStreamV1.seek(0);
- inputStreamV2.seek(0);
-
- verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
-
- verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
-
- int seekPosition = 2 * KILOBYTE;
- inputStreamV1.seek(seekPosition);
- inputStreamV2.seek(seekPosition);
-
- inputStreamV1.seek(0);
- inputStreamV2.seek(0);
-
- verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
-
- verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
-
- verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
-
- seekPosition = 5 * KILOBYTE;
- inputStreamV1.seek(seekPosition);
- inputStreamV2.seek(seekPosition);
-
- verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
-
- seekPosition = 10 * KILOBYTE;
- inputStreamV1.seek(seekPosition);
- inputStreamV2.seek(seekPosition);
-
- verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
-
- verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
-
- seekPosition = 4100 * KILOBYTE;
- inputStreamV1.seek(seekPosition);
- inputStreamV2.seek(seekPosition);
-
- verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
- }
- }
-
- private void verifyConsistentReads(FSDataInputStream inputStreamV1,
- FSDataInputStream inputStreamV2,
- byte[] bufferV1,
- byte[] bufferV2) throws IOException {
- int size = bufferV1.length;
- final int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, size);
- assertEquals("Bytes read from V1 stream", size, numBytesReadV1);
-
- final int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, size);
- assertEquals("Bytes read from V2 stream", size, numBytesReadV2);
-
- assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
- }
-
- /**
- * Validates the implementation of InputStream.markSupported.
- * @throws IOException
- */
- @Test
- public void test_0301_MarkSupportedV1() throws IOException {
- validateMarkSupported(accountUsingInputStreamV1.getFileSystem());
- }
-
- /**
- * Validates the implementation of InputStream.markSupported.
- * @throws IOException
- */
- @Test
- public void test_0302_MarkSupportedV2() throws IOException {
- validateMarkSupported(accountUsingInputStreamV1.getFileSystem());
- }
-
- private void validateMarkSupported(FileSystem fs) throws IOException {
- assumeHugeFileExists();
- try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
- assertTrue("mark is not supported", inputStream.markSupported());
- }
- }
-
- /**
- * Validates the implementation of InputStream.mark and reset
- * for version 1 of the block blob input stream.
- * @throws Exception
- */
- @Test
- public void test_0303_MarkAndResetV1() throws Exception {
- validateMarkAndReset(accountUsingInputStreamV1.getFileSystem());
- }
-
- /**
- * Validates the implementation of InputStream.mark and reset
- * for version 2 of the block blob input stream.
- * @throws Exception
- */
- @Test
- public void test_0304_MarkAndResetV2() throws Exception {
- validateMarkAndReset(accountUsingInputStreamV2.getFileSystem());
- }
-
- private void validateMarkAndReset(FileSystem fs) throws Exception {
- assumeHugeFileExists();
- try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
- inputStream.mark(KILOBYTE - 1);
-
- byte[] buffer = new byte[KILOBYTE];
- int bytesRead = inputStream.read(buffer);
- assertEquals(buffer.length, bytesRead);
-
- inputStream.reset();
- assertEquals("rest -> pos 0", 0, inputStream.getPos());
-
- inputStream.mark(8 * KILOBYTE - 1);
-
- buffer = new byte[8 * KILOBYTE];
- bytesRead = inputStream.read(buffer);
- assertEquals(buffer.length, bytesRead);
-
- intercept(IOException.class,
- "Resetting to invalid mark",
- new Callable<FSDataInputStream>() {
- @Override
- public FSDataInputStream call() throws Exception {
- inputStream.reset();
- return inputStream;
- }
- }
- );
- }
- }
-
- /**
- * Validates the implementation of Seekable.seekToNewSource, which should
- * return false for version 1 of the block blob input stream.
- * @throws IOException
- */
- @Test
- public void test_0305_SeekToNewSourceV1() throws IOException {
- validateSeekToNewSource(accountUsingInputStreamV1.getFileSystem());
- }
-
- /**
- * Validates the implementation of Seekable.seekToNewSource, which should
- * return false for version 2 of the block blob input stream.
- * @throws IOException
- */
- @Test
- public void test_0306_SeekToNewSourceV2() throws IOException {
- validateSeekToNewSource(accountUsingInputStreamV2.getFileSystem());
- }
-
- private void validateSeekToNewSource(FileSystem fs) throws IOException {
- assumeHugeFileExists();
- try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
- assertFalse(inputStream.seekToNewSource(0));
- }
- }
-
- /**
- * Validates the implementation of InputStream.skip and ensures there is no
- * network I/O for version 1 of the block blob input stream.
- * @throws Exception
- */
- @Test
- public void test_0307_SkipBoundsV1() throws Exception {
- validateSkipBounds(accountUsingInputStreamV1.getFileSystem());
- }
-
- /**
- * Validates the implementation of InputStream.skip and ensures there is no
- * network I/O for version 2 of the block blob input stream.
- * @throws Exception
- */
- @Test
- public void test_0308_SkipBoundsV2() throws Exception {
- validateSkipBounds(accountUsingInputStreamV2.getFileSystem());
- }
-
- private void validateSkipBounds(FileSystem fs) throws Exception {
- assumeHugeFileExists();
- try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
- NanoTimer timer = new NanoTimer();
-
- long skipped = inputStream.skip(-1);
- assertEquals(0, skipped);
-
- skipped = inputStream.skip(0);
- assertEquals(0, skipped);
-
- assertTrue(testFileLength > 0);
-
- skipped = inputStream.skip(testFileLength);
- assertEquals(testFileLength, skipped);
-
- intercept(EOFException.class,
- new Callable<Long>() {
- @Override
- public Long call() throws Exception {
- return inputStream.skip(1);
- }
- }
- );
- long elapsedTimeMs = timer.elapsedTimeMs();
- assertTrue(
- String.format(
- "There should not be any network I/O (elapsedTimeMs=%1$d).",
- elapsedTimeMs),
- elapsedTimeMs < 20);
- }
- }
-
- /**
- * Validates the implementation of Seekable.seek and ensures there is no
- * network I/O for forward seek.
- * @throws Exception
- */
- @Test
- public void test_0309_SeekBoundsV1() throws Exception {
- validateSeekBounds(accountUsingInputStreamV1.getFileSystem());
- }
-
- /**
- * Validates the implementation of Seekable.seek and ensures there is no
- * network I/O for forward seek.
- * @throws Exception
- */
- @Test
- public void test_0310_SeekBoundsV2() throws Exception {
- validateSeekBounds(accountUsingInputStreamV2.getFileSystem());
- }
-
- private void validateSeekBounds(FileSystem fs) throws Exception {
- assumeHugeFileExists();
- try (
- FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
- ) {
- NanoTimer timer = new NanoTimer();
-
- inputStream.seek(0);
- assertEquals(0, inputStream.getPos());
-
- intercept(EOFException.class,
- FSExceptionMessages.NEGATIVE_SEEK,
- new Callable<FSDataInputStream>() {
- @Override
- public FSDataInputStream call() throws Exception {
- inputStream.seek(-1);
- return inputStream;
- }
- }
- );
-
- assertTrue("Test file length only " + testFileLength, testFileLength > 0);
- inputStream.seek(testFileLength);
- assertEquals(testFileLength, inputStream.getPos());
-
- intercept(EOFException.class,
- FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
- new Callable<FSDataInputStream>() {
- @Override
- public FSDataInputStream call() throws Exception {
- inputStream.seek(testFileLength + 1);
- return inputStream;
- }
- }
- );
-
- long elapsedTimeMs = timer.elapsedTimeMs();
- assertTrue(
- String.format(
- "There should not be any network I/O (elapsedTimeMs=%1$d).",
- elapsedTimeMs),
- elapsedTimeMs < 20);
- }
- }
-
- /**
- * Validates the implementation of Seekable.seek, Seekable.getPos,
- * and InputStream.available.
- * @throws Exception
- */
- @Test
- public void test_0311_SeekAndAvailableAndPositionV1() throws Exception {
- validateSeekAndAvailableAndPosition(
- accountUsingInputStreamV1.getFileSystem());
- }
-
- /**
- * Validates the implementation of Seekable.seek, Seekable.getPos,
- * and InputStream.available.
- * @throws Exception
- */
- @Test
- public void test_0312_SeekAndAvailableAndPositionV2() throws Exception {
- validateSeekAndAvailableAndPosition(
- accountUsingInputStreamV2.getFileSystem());
- }
-
- private void validateSeekAndAvailableAndPosition(FileSystem fs)
- throws Exception {
- assumeHugeFileExists();
- try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
- byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
- byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
- byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
- byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'};
- byte[] buffer = new byte[3];
-
- int bytesRead = inputStream.read(buffer);
- assertEquals(buffer.length, bytesRead);
- assertArrayEquals(expected1, buffer);
- assertEquals(buffer.length, inputStream.getPos());
- assertEquals(testFileLength - inputStream.getPos(),
- inputStream.available());
-
- bytesRead = inputStream.read(buffer);
- assertEquals(buffer.length, bytesRead);
- assertArrayEquals(expected2, buffer);
- assertEquals(2 * buffer.length, inputStream.getPos());
- assertEquals(testFileLength - inputStream.getPos(),
- inputStream.available());
-
- // reverse seek
- int seekPos = 0;
- inputStream.seek(seekPos);
-
- bytesRead = inputStream.read(buffer);
- assertEquals(buffer.length, bytesRead);
- assertArrayEquals(expected1, buffer);
- assertEquals(buffer.length + seekPos, inputStream.getPos());
- assertEquals(testFileLength - inputStream.getPos(),
- inputStream.available());
-
- // reverse seek
- seekPos = 1;
- inputStream.seek(seekPos);
-
- bytesRead = inputStream.read(buffer);
- assertEquals(buffer.length, bytesRead);
- assertArrayEquals(expected3, buffer);
- assertEquals(buffer.length + seekPos, inputStream.getPos());
- assertEquals(testFileLength - inputStream.getPos(),
- inputStream.available());
-
- // forward seek
- seekPos = 6;
- inputStream.seek(seekPos);
-
- bytesRead = inputStream.read(buffer);
- assertEquals(buffer.length, bytesRead);
- assertArrayEquals(expected4, buffer);
- assertEquals(buffer.length + seekPos, inputStream.getPos());
- assertEquals(testFileLength - inputStream.getPos(),
- inputStream.available());
- }
- }
-
- /**
- * Validates the implementation of InputStream.skip, Seekable.getPos,
- * and InputStream.available.
- * @throws IOException
- */
- @Test
- public void test_0313_SkipAndAvailableAndPositionV1() throws IOException {
- validateSkipAndAvailableAndPosition(
- accountUsingInputStreamV1.getFileSystem());
- }
-
- /**
- * Validates the implementation of InputStream.skip, Seekable.getPos,
- * and InputStream.available.
- * @throws IOException
- */
- @Test
- public void test_0314_SkipAndAvailableAndPositionV2() throws IOException {
- validateSkipAndAvailableAndPosition(
- accountUsingInputStreamV1.getFileSystem());
- }
-
- private void validateSkipAndAvailableAndPosition(FileSystem fs)
- throws IOException {
- assumeHugeFileExists();
- try (
- FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
- ) {
- byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
- byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
- byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
- byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'};
-
- assertEquals(testFileLength, inputStream.available());
- assertEquals(0, inputStream.getPos());
-
- int n = 3;
- long skipped = inputStream.skip(n);
-
- assertEquals(skipped, inputStream.getPos());
- assertEquals(testFileLength - inputStream.getPos(),
- inputStream.available());
- assertEquals(skipped, n);
-
- byte[] buffer = new byte[3];
- int bytesRead = inputStream.read(buffer);
- assertEquals(buffer.length, bytesRead);
- assertArrayEquals(expected2, buffer);
- assertEquals(buffer.length + skipped, inputStream.getPos());
- assertEquals(testFileLength - inputStream.getPos(),
- inputStream.available());
-
- // does skip still work after seek?
- int seekPos = 1;
- inputStream.seek(seekPos);
-
- bytesRead = inputStream.read(buffer);
- assertEquals(buffer.length, bytesRead);
- assertArrayEquals(expected3, buffer);
- assertEquals(buffer.length + seekPos, inputStream.getPos());
- assertEquals(testFileLength - inputStream.getPos(),
- inputStream.available());
-
- long currentPosition = inputStream.getPos();
- n = 2;
- skipped = inputStream.skip(n);
-
- assertEquals(currentPosition + skipped, inputStream.getPos());
- assertEquals(testFileLength - inputStream.getPos(),
- inputStream.available());
- assertEquals(skipped, n);
-
- bytesRead = inputStream.read(buffer);
- assertEquals(buffer.length, bytesRead);
- assertArrayEquals(expected4, buffer);
- assertEquals(buffer.length + skipped + currentPosition,
- inputStream.getPos());
- assertEquals(testFileLength - inputStream.getPos(),
- inputStream.available());
- }
- }
-
- /**
- * Ensures parity in the performance of sequential read for
- * version 1 and version 2 of the block blob input stream.
- * @throws IOException
- */
- @Test
- public void test_0315_SequentialReadPerformance() throws IOException {
- assumeHugeFileExists();
- final int maxAttempts = 10;
- final double maxAcceptableRatio = 1.01;
- double v1ElapsedMs = 0, v2ElapsedMs = 0;
- double ratio = Double.MAX_VALUE;
- for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
- v1ElapsedMs = sequentialRead(1,
- accountUsingInputStreamV1.getFileSystem(), false);
- v2ElapsedMs = sequentialRead(2,
- accountUsingInputStreamV2.getFileSystem(), false);
- ratio = v2ElapsedMs / v1ElapsedMs;
- LOG.info(String.format(
- "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f",
- (long) v1ElapsedMs,
- (long) v2ElapsedMs,
- ratio));
- }
- assertTrue(String.format(
- "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d,"
- + " v2ElapsedMs=%2$d, ratio=%3$.2f",
- (long) v1ElapsedMs,
- (long) v2ElapsedMs,
- ratio),
- ratio < maxAcceptableRatio);
- }
-
- /**
- * Ensures parity in the performance of sequential read after reverse seek for
- * version 2 of the block blob input stream.
- * @throws IOException
- */
- @Test
- public void test_0316_SequentialReadAfterReverseSeekPerformanceV2()
- throws IOException {
- assumeHugeFileExists();
- final int maxAttempts = 10;
- final double maxAcceptableRatio = 1.01;
- double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0;
- double ratio = Double.MAX_VALUE;
- for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
- beforeSeekElapsedMs = sequentialRead(2,
- accountUsingInputStreamV2.getFileSystem(), false);
- afterSeekElapsedMs = sequentialRead(2,
- accountUsingInputStreamV2.getFileSystem(), true);
- ratio = afterSeekElapsedMs / beforeSeekElapsedMs;
- LOG.info(String.format(
- "beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d, ratio=%3$.2f",
- (long) beforeSeekElapsedMs,
- (long) afterSeekElapsedMs,
- ratio));
- }
- assertTrue(String.format(
- "Performance of version 2 after reverse seek is not acceptable:"
- + " beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d,"
- + " ratio=%3$.2f",
- (long) beforeSeekElapsedMs,
- (long) afterSeekElapsedMs,
- ratio),
- ratio < maxAcceptableRatio);
- }
-
- private long sequentialRead(int version,
- FileSystem fs,
- boolean afterReverseSeek) throws IOException {
- byte[] buffer = new byte[16 * KILOBYTE];
- long totalBytesRead = 0;
- long bytesRead = 0;
-
- try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
- if (afterReverseSeek) {
- while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) {
- bytesRead = inputStream.read(buffer);
- totalBytesRead += bytesRead;
- }
- totalBytesRead = 0;
- inputStream.seek(0);
- }
-
- NanoTimer timer = new NanoTimer();
- while ((bytesRead = inputStream.read(buffer)) > 0) {
- totalBytesRead += bytesRead;
- }
- long elapsedTimeMs = timer.elapsedTimeMs();
-
- LOG.info(String.format(
- "v%1$d: bytesRead=%2$d, elapsedMs=%3$d, Mbps=%4$.2f,"
- + " afterReverseSeek=%5$s",
- version,
- totalBytesRead,
- elapsedTimeMs,
- toMbps(totalBytesRead, elapsedTimeMs),
- afterReverseSeek));
-
- assertEquals(testFileLength, totalBytesRead);
- inputStream.close();
- return elapsedTimeMs;
- }
- }
-
- @Test
- public void test_0317_RandomReadPerformance() throws IOException {
- assumeHugeFileExists();
- final int maxAttempts = 10;
- final double maxAcceptableRatio = 0.10;
- double v1ElapsedMs = 0, v2ElapsedMs = 0;
- double ratio = Double.MAX_VALUE;
- for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
- v1ElapsedMs = randomRead(1,
- accountUsingInputStreamV1.getFileSystem());
- v2ElapsedMs = randomRead(2,
- accountUsingInputStreamV2.getFileSystem());
- ratio = v2ElapsedMs / v1ElapsedMs;
- LOG.info(String.format(
- "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f",
- (long) v1ElapsedMs,
- (long) v2ElapsedMs,
- ratio));
- }
- assertTrue(String.format(
- "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d,"
- + " v2ElapsedMs=%2$d, ratio=%3$.2f",
- (long) v1ElapsedMs,
- (long) v2ElapsedMs,
- ratio),
- ratio < maxAcceptableRatio);
- }
-
- private long randomRead(int version, FileSystem fs) throws IOException {
- assumeHugeFileExists();
- final int minBytesToRead = 2 * MEGABYTE;
- Random random = new Random();
- byte[] buffer = new byte[8 * KILOBYTE];
- long totalBytesRead = 0;
- long bytesRead = 0;
- try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
- NanoTimer timer = new NanoTimer();
-
- do {
- bytesRead = inputStream.read(buffer);
- totalBytesRead += bytesRead;
- inputStream.seek(random.nextInt(
- (int) (testFileLength - buffer.length)));
- } while (bytesRead > 0 && totalBytesRead < minBytesToRead);
-
- long elapsedTimeMs = timer.elapsedTimeMs();
-
- inputStream.close();
-
- LOG.info(String.format(
- "v%1$d: totalBytesRead=%2$d, elapsedTimeMs=%3$d, Mbps=%4$.2f",
- version,
- totalBytesRead,
- elapsedTimeMs,
- toMbps(totalBytesRead, elapsedTimeMs)));
-
- assertTrue(minBytesToRead <= totalBytesRead);
-
- return elapsedTimeMs;
- }
- }
-
- @Test
- public void test_999_DeleteHugeFiles() throws IOException {
- ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
- fs.delete(TEST_FILE_PATH, false);
- timer.end("time to delete %s", TEST_FILE_PATH);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java
index 307e5af..c2496d7 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java
@@ -21,13 +21,10 @@ package org.apache.hadoop.fs.azure;
import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
import org.junit.Test;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
-
/**
* Tests for <code>ClientThrottlingAnalyzer</code>.
*/
-public class TestClientThrottlingAnalyzer {
+public class TestClientThrottlingAnalyzer extends AbstractWasbTestWithTimeout {
private static final int ANALYSIS_PERIOD = 1000;
private static final int ANALYSIS_PERIOD_PLUS_10_PERCENT = ANALYSIS_PERIOD
+ ANALYSIS_PERIOD / 10;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org