You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/09/15 16:39:44 UTC
[19/20] hadoop git commit: HADOOP-14553. Add (parallelized)
integration tests to hadoop-azure Contributed by Steve Loughran
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6b08f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobDataValidation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobDataValidation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobDataValidation.java
new file mode 100644
index 0000000..0aa9393
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobDataValidation.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_CHECK_BLOCK_MD5;
+import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_STORE_BLOB_MD5;
+import static org.junit.Assume.assumeNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.util.Arrays;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+
+import org.junit.After;
+import org.junit.Test;
+
+import com.microsoft.azure.storage.Constants;
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.ResponseReceivedEvent;
+import com.microsoft.azure.storage.StorageErrorCodeStrings;
+import com.microsoft.azure.storage.StorageEvent;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockSearchMode;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.core.Base64;
+
+/**
+ * Test that we do proper data integrity validation with MD5 checks as
+ * configured.
+ */
+public class ITestBlobDataValidation extends AbstractWasbTestWithTimeout {
+ private AzureBlobStorageTestAccount testAccount;
+
+ @After
+ public void tearDown() throws Exception {
+ testAccount = AzureTestUtils.cleanupTestAccount(testAccount);
+ }
+
+ /**
+ * Test that by default we don't store the blob-level MD5.
+ */
+ @Test
+ public void testBlobMd5StoreOffByDefault() throws Exception {
+ testAccount = AzureBlobStorageTestAccount.create();
+ testStoreBlobMd5(false);
+ }
+
+ /**
+ * Test that we get blob-level MD5 storage and validation if we specify that
+ * in the configuration.
+ */
+ @Test
+ public void testStoreBlobMd5() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(KEY_STORE_BLOB_MD5, true);
+ testAccount = AzureBlobStorageTestAccount.create(conf);
+ testStoreBlobMd5(true);
+ }
+
+ /**
+ * Trims a suffix/prefix from the given string. For example if
+ * s is given as "/xy" and toTrim is "/", this method returns "xy"
+ */
+ private static String trim(String s, String toTrim) {
+ return StringUtils.removeEnd(StringUtils.removeStart(s, toTrim),
+ toTrim);
+ }
+
+ private void testStoreBlobMd5(boolean expectMd5Stored) throws Exception {
+ assumeNotNull(testAccount);
+ // Write a test file.
+ NativeAzureFileSystem fs = testAccount.getFileSystem();
+ Path testFilePath = AzureTestUtils.pathForTests(fs,
+ methodName.getMethodName());
+ String testFileKey = trim(testFilePath.toUri().getPath(), "/");
+ OutputStream outStream = fs.create(testFilePath);
+ outStream.write(new byte[] { 5, 15 });
+ outStream.close();
+
+ // Check that we stored/didn't store the MD5 field as configured.
+ CloudBlockBlob blob = testAccount.getBlobReference(testFileKey);
+ blob.downloadAttributes();
+ String obtainedMd5 = blob.getProperties().getContentMD5();
+ if (expectMd5Stored) {
+ assertNotNull(obtainedMd5);
+ } else {
+ assertNull("Expected no MD5, found: " + obtainedMd5, obtainedMd5);
+ }
+
+ // Mess with the content so it doesn't match the MD5.
+ String newBlockId = Base64.encode(new byte[] { 55, 44, 33, 22 });
+ blob.uploadBlock(newBlockId,
+ new ByteArrayInputStream(new byte[] { 6, 45 }), 2);
+ blob.commitBlockList(Arrays.asList(new BlockEntry[] { new BlockEntry(
+ newBlockId, BlockSearchMode.UNCOMMITTED) }));
+
+ // Now read back the content. If we stored the MD5 for the blob content
+ // we should get a data corruption error.
+ InputStream inStream = fs.open(testFilePath);
+ try {
+ byte[] inBuf = new byte[100];
+ while (inStream.read(inBuf) > 0){
+ //nothing;
+ }
+ inStream.close();
+ if (expectMd5Stored) {
+ fail("Should've thrown because of data corruption.");
+ }
+ } catch (IOException ex) {
+ if (!expectMd5Stored) {
+ throw ex;
+ }
+ StorageException cause = (StorageException)ex.getCause();
+ assertNotNull(cause);
+ assertEquals("Unexpected cause: " + cause,
+ StorageErrorCodeStrings.INVALID_MD5, cause.getErrorCode());
+ }
+ }
+
+ /**
+ * Test that by default we check block-level MD5.
+ */
+ @Test
+ public void testCheckBlockMd5() throws Exception {
+ testAccount = AzureBlobStorageTestAccount.create();
+ testCheckBlockMd5(true);
+ }
+
+ /**
+ * Test that we don't check block-level MD5 if we specify that in the
+ * configuration.
+ */
+ @Test
+ public void testDontCheckBlockMd5() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(KEY_CHECK_BLOCK_MD5, false);
+ testAccount = AzureBlobStorageTestAccount.create(conf);
+ testCheckBlockMd5(false);
+ }
+
+ /**
+ * Connection inspector to check that MD5 fields for content is set/not set as
+ * expected.
+ */
+ private static class ContentMD5Checker extends
+ StorageEvent<ResponseReceivedEvent> {
+ private final boolean expectMd5;
+
+ public ContentMD5Checker(boolean expectMd5) {
+ this.expectMd5 = expectMd5;
+ }
+
+ @Override
+ public void eventOccurred(ResponseReceivedEvent eventArg) {
+ HttpURLConnection connection = (HttpURLConnection) eventArg
+ .getConnectionObject();
+ if (isGetRange(connection)) {
+ checkObtainedMd5(connection
+ .getHeaderField(Constants.HeaderConstants.CONTENT_MD5));
+ } else if (isPutBlock(connection)) {
+ checkObtainedMd5(connection
+ .getRequestProperty(Constants.HeaderConstants.CONTENT_MD5));
+ }
+ }
+
+ private void checkObtainedMd5(String obtainedMd5) {
+ if (expectMd5) {
+ assertNotNull(obtainedMd5);
+ } else {
+ assertNull("Expected no MD5, found: " + obtainedMd5, obtainedMd5);
+ }
+ }
+
+ private static boolean isPutBlock(HttpURLConnection connection) {
+ return connection.getRequestMethod().equals("PUT")
+ && connection.getURL().getQuery() != null
+ && connection.getURL().getQuery().contains("blockid");
+ }
+
+ private static boolean isGetRange(HttpURLConnection connection) {
+ return connection.getRequestMethod().equals("GET")
+ && connection
+ .getHeaderField(Constants.HeaderConstants.STORAGE_RANGE_HEADER) != null;
+ }
+ }
+
+ private void testCheckBlockMd5(final boolean expectMd5Checked)
+ throws Exception {
+ assumeNotNull(testAccount);
+ Path testFilePath = new Path("/testFile");
+
+ // Add a hook to check that for GET/PUT requests we set/don't set
+ // the block-level MD5 field as configured. I tried to do clever
+ // testing by also messing with the raw data to see if we actually
+ // validate the data as expected, but the HttpURLConnection wasn't
+ // pluggable enough for me to do that.
+ testAccount.getFileSystem().getStore()
+ .addTestHookToOperationContext(new TestHookOperationContext() {
+ @Override
+ public OperationContext modifyOperationContext(
+ OperationContext original) {
+ original.getResponseReceivedEventHandler().addListener(
+ new ContentMD5Checker(expectMd5Checked));
+ return original;
+ }
+ });
+
+ OutputStream outStream = testAccount.getFileSystem().create(testFilePath);
+ outStream.write(new byte[] { 5, 15 });
+ outStream.close();
+
+ InputStream inStream = testAccount.getFileSystem().open(testFilePath);
+ byte[] inBuf = new byte[100];
+ while (inStream.read(inBuf) > 0){
+ //nothing;
+ }
+ inStream.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6b08f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobTypeSpeedDifference.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobTypeSpeedDifference.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobTypeSpeedDifference.java
new file mode 100644
index 0000000..b46ad5b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobTypeSpeedDifference.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Date;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
+
+
+/**
+ * A simple benchmark to find out the difference in speed between block
+ * and page blobs.
+ */
+public class ITestBlobTypeSpeedDifference extends AbstractWasbTestBase {
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+
+ /**
+ * Writes data to the given stream of the given size, flushing every
+ * x bytes.
+ */
+ private static void writeTestFile(OutputStream writeStream,
+ long size, long flushInterval) throws IOException {
+ int bufferSize = (int) Math.min(1000, flushInterval);
+ byte[] buffer = new byte[bufferSize];
+ Arrays.fill(buffer, (byte) 7);
+ int bytesWritten = 0;
+ int bytesUnflushed = 0;
+ while (bytesWritten < size) {
+ int numberToWrite = (int) Math.min(bufferSize, size - bytesWritten);
+ writeStream.write(buffer, 0, numberToWrite);
+ bytesWritten += numberToWrite;
+ bytesUnflushed += numberToWrite;
+ if (bytesUnflushed >= flushInterval) {
+ writeStream.flush();
+ bytesUnflushed = 0;
+ }
+ }
+ }
+
+ private static class TestResult {
+ final long timeTakenInMs;
+ final long totalNumberOfRequests;
+
+ TestResult(long timeTakenInMs, long totalNumberOfRequests) {
+ this.timeTakenInMs = timeTakenInMs;
+ this.totalNumberOfRequests = totalNumberOfRequests;
+ }
+ }
+
+ /**
+ * Writes data to the given file of the given size, flushing every
+ * x bytes. Measure performance of that and return it.
+ */
+ private static TestResult writeTestFile(NativeAzureFileSystem fs, Path path,
+ long size, long flushInterval) throws IOException {
+ AzureFileSystemInstrumentation instrumentation =
+ fs.getInstrumentation();
+ long initialRequests = instrumentation.getCurrentWebResponses();
+ Date start = new Date();
+ OutputStream output = fs.create(path);
+ writeTestFile(output, size, flushInterval);
+ output.close();
+ long finalRequests = instrumentation.getCurrentWebResponses();
+ return new TestResult(new Date().getTime() - start.getTime(),
+ finalRequests - initialRequests);
+ }
+
+ /**
+ * Writes data to a block blob of the given size, flushing every
+ * x bytes. Measure performance of that and return it.
+ */
+ private static TestResult writeBlockBlobTestFile(NativeAzureFileSystem fs,
+ long size, long flushInterval) throws IOException {
+ return writeTestFile(fs, new Path("/blockBlob"), size, flushInterval);
+ }
+
+ /**
+ * Writes data to a page blob of the given size, flushing every
+ * x bytes. Measure performance of that and return it.
+ */
+ private static TestResult writePageBlobTestFile(NativeAzureFileSystem fs,
+ long size, long flushInterval) throws IOException {
+ Path testFile = AzureTestUtils.blobPathForTests(fs,
+ "writePageBlobTestFile");
+ return writeTestFile(fs,
+ testFile,
+ size, flushInterval);
+ }
+
+ /**
+ * Runs the benchmark over a small 10 KB file, flushing every 500 bytes.
+ */
+ @Test
+ public void testTenKbFileFrequentFlush() throws Exception {
+ testForSizeAndFlushInterval(getFileSystem(), 10 * 1000, 500);
+ }
+
+ /**
+ * Runs the benchmark for the given file size and flush frequency.
+ */
+ private static void testForSizeAndFlushInterval(NativeAzureFileSystem fs,
+ final long size, final long flushInterval) throws IOException {
+ for (int i = 0; i < 5; i++) {
+ TestResult pageBlobResults = writePageBlobTestFile(fs, size, flushInterval);
+ System.out.printf(
+ "Page blob upload took %d ms. Total number of requests: %d.\n",
+ pageBlobResults.timeTakenInMs, pageBlobResults.totalNumberOfRequests);
+ TestResult blockBlobResults = writeBlockBlobTestFile(fs, size, flushInterval);
+ System.out.printf(
+ "Block blob upload took %d ms. Total number of requests: %d.\n",
+ blockBlobResults.timeTakenInMs, blockBlobResults.totalNumberOfRequests);
+ }
+ }
+
+ /**
+ * Runs the benchmark for the given file size and flush frequency from the
+ * command line.
+ */
+ public static void main(String[] argv) throws Exception {
+ Configuration conf = new Configuration();
+ long size = 10 * 1000 * 1000;
+ long flushInterval = 2000;
+ if (argv.length > 0) {
+ size = Long.parseLong(argv[0]);
+ }
+ if (argv.length > 1) {
+ flushInterval = Long.parseLong(argv[1]);
+ }
+ testForSizeAndFlushInterval(
+ (NativeAzureFileSystem) FileSystem.get(conf),
+ size,
+ flushInterval);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6b08f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java
new file mode 100644
index 0000000..07a13df
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java
@@ -0,0 +1,874 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
+
+import static org.junit.Assume.assumeNotNull;
+
+import static org.apache.hadoop.test.LambdaTestUtils.*;
+
+/**
+ * Test semantics and performance of the original block blob input stream
+ * (KEY_INPUT_STREAM_VERSION=1) and the new
+ * <code>BlockBlobInputStream</code> (KEY_INPUT_STREAM_VERSION=2).
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+
+public class ITestBlockBlobInputStream extends AbstractAzureScaleTest {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ITestBlockBlobInputStream.class);
+ private static final int KILOBYTE = 1024;
+ private static final int MEGABYTE = KILOBYTE * KILOBYTE;
+ private static final int TEST_FILE_SIZE = 6 * MEGABYTE;
+ private static final Path TEST_FILE_PATH = new Path(
+ "TestBlockBlobInputStream.txt");
+
+ private AzureBlobStorageTestAccount accountUsingInputStreamV1;
+ private AzureBlobStorageTestAccount accountUsingInputStreamV2;
+ private long testFileLength;
+
+
+
+ private FileStatus testFileStatus;
+ private Path hugefile;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = new Configuration();
+ conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1);
+
+ accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create(
+ "testblockblobinputstream",
+ EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
+ conf,
+ true);
+
+ accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create(
+ "testblockblobinputstream",
+ EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class),
+ null,
+ true);
+
+ assumeNotNull(accountUsingInputStreamV1);
+ assumeNotNull(accountUsingInputStreamV2);
+ hugefile = fs.makeQualified(TEST_FILE_PATH);
+ try {
+ testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
+ testFileLength = testFileStatus.getLen();
+ } catch (FileNotFoundException e) {
+ // file doesn't exist
+ testFileLength = 0;
+ }
+ }
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1);
+
+ accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create(
+ "testblockblobinputstream",
+ EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
+ conf,
+ true);
+
+ accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create(
+ "testblockblobinputstream",
+ EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class),
+ null,
+ true);
+
+ assumeNotNull(accountUsingInputStreamV1);
+ assumeNotNull(accountUsingInputStreamV2);
+ return accountUsingInputStreamV1;
+ }
+
+ /**
+ * Create a test file by repeating the characters in the alphabet.
+ * @throws IOException
+ */
+ private void createTestFileAndSetLength() throws IOException {
+ FileSystem fs = accountUsingInputStreamV1.getFileSystem();
+
+ // To reduce test run time, the test file can be reused.
+ if (fs.exists(TEST_FILE_PATH)) {
+ testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
+ testFileLength = testFileStatus.getLen();
+ LOG.info("Reusing test file: {}", testFileStatus);
+ return;
+ }
+
+ int sizeOfAlphabet = ('z' - 'a' + 1);
+ byte[] buffer = new byte[26 * KILOBYTE];
+ char character = 'a';
+ for (int i = 0; i < buffer.length; i++) {
+ buffer[i] = (byte) character;
+ character = (character == 'z') ? 'a' : (char) ((int) character + 1);
+ }
+
+ LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH,
+ TEST_FILE_SIZE);
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+
+ try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
+ int bytesWritten = 0;
+ while (bytesWritten < TEST_FILE_SIZE) {
+ outputStream.write(buffer);
+ bytesWritten += buffer.length;
+ }
+ LOG.info("Closing stream {}", outputStream);
+ ContractTestUtils.NanoTimer closeTimer
+ = new ContractTestUtils.NanoTimer();
+ outputStream.close();
+ closeTimer.end("time to close() output stream");
+ }
+ timer.end("time to write %d KB", TEST_FILE_SIZE / 1024);
+ testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen();
+ }
+
+ void assumeHugeFileExists() throws IOException {
+ ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
+ FileStatus status = fs.getFileStatus(hugefile);
+ ContractTestUtils.assertIsFile(hugefile, status);
+ assertTrue("File " + hugefile + " is empty", status.getLen() > 0);
+ }
+
+ /**
+ * Calculate megabits per second from the specified values for bytes and
+ * milliseconds.
+ * @param bytes The number of bytes.
+ * @param milliseconds The number of milliseconds.
+ * @return The number of megabits per second.
+ */
+ private static double toMbps(long bytes, long milliseconds) {
+ return bytes / 1000.0 * 8 / milliseconds;
+ }
+
+ @Test
+ public void test_0100_CreateHugeFile() throws IOException {
+ createTestFileAndSetLength();
+ }
+
+ @Test
+ public void test_0200_BasicReadTest() throws Exception {
+ assumeHugeFileExists();
+
+ try (
+ FSDataInputStream inputStreamV1
+ = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
+
+ FSDataInputStream inputStreamV2
+ = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
+ ) {
+ byte[] bufferV1 = new byte[3 * MEGABYTE];
+ byte[] bufferV2 = new byte[bufferV1.length];
+
+ // v1 forward seek and read a kilobyte into first kilobyte of bufferV1
+ inputStreamV1.seek(5 * MEGABYTE);
+ int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, KILOBYTE);
+ assertEquals(KILOBYTE, numBytesReadV1);
+
+ // v2 forward seek and read a kilobyte into first kilobyte of bufferV2
+ inputStreamV2.seek(5 * MEGABYTE);
+ int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, KILOBYTE);
+ assertEquals(KILOBYTE, numBytesReadV2);
+
+ assertArrayEquals(bufferV1, bufferV2);
+
+ int len = MEGABYTE;
+ int offset = bufferV1.length - len;
+
+ // v1 reverse seek and read a megabyte into last megabyte of bufferV1
+ inputStreamV1.seek(3 * MEGABYTE);
+ numBytesReadV1 = inputStreamV1.read(bufferV1, offset, len);
+ assertEquals(len, numBytesReadV1);
+
+ // v2 reverse seek and read a megabyte into last megabyte of bufferV2
+ inputStreamV2.seek(3 * MEGABYTE);
+ numBytesReadV2 = inputStreamV2.read(bufferV2, offset, len);
+ assertEquals(len, numBytesReadV2);
+
+ assertArrayEquals(bufferV1, bufferV2);
+ }
+ }
+
+ @Test
+ public void test_0201_RandomReadTest() throws Exception {
+ assumeHugeFileExists();
+
+ try (
+ FSDataInputStream inputStreamV1
+ = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
+
+ FSDataInputStream inputStreamV2
+ = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
+ ) {
+ final int bufferSize = 4 * KILOBYTE;
+ byte[] bufferV1 = new byte[bufferSize];
+ byte[] bufferV2 = new byte[bufferV1.length];
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ inputStreamV1.seek(0);
+ inputStreamV2.seek(0);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ int seekPosition = 2 * KILOBYTE;
+ inputStreamV1.seek(seekPosition);
+ inputStreamV2.seek(seekPosition);
+
+ inputStreamV1.seek(0);
+ inputStreamV2.seek(0);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ seekPosition = 5 * KILOBYTE;
+ inputStreamV1.seek(seekPosition);
+ inputStreamV2.seek(seekPosition);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ seekPosition = 10 * KILOBYTE;
+ inputStreamV1.seek(seekPosition);
+ inputStreamV2.seek(seekPosition);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ seekPosition = 4100 * KILOBYTE;
+ inputStreamV1.seek(seekPosition);
+ inputStreamV2.seek(seekPosition);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+ }
+ }
+
+ private void verifyConsistentReads(FSDataInputStream inputStreamV1,
+ FSDataInputStream inputStreamV2,
+ byte[] bufferV1,
+ byte[] bufferV2) throws IOException {
+ int size = bufferV1.length;
+ final int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, size);
+ assertEquals("Bytes read from V1 stream", size, numBytesReadV1);
+
+ final int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, size);
+ assertEquals("Bytes read from V2 stream", size, numBytesReadV2);
+
+ assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
+ }
+
+ /**
+ * Validates the implementation of InputStream.markSupported.
+ * @throws IOException
+ */
+ @Test
+ public void test_0301_MarkSupportedV1() throws IOException {
+ validateMarkSupported(accountUsingInputStreamV1.getFileSystem());
+ }
+
+ /**
+ * Validates the implementation of InputStream.markSupported.
+ * @throws IOException
+ */
+ @Test
+ public void test_0302_MarkSupportedV2() throws IOException {
+ validateMarkSupported(accountUsingInputStreamV1.getFileSystem());
+ }
+
+ private void validateMarkSupported(FileSystem fs) throws IOException {
+ assumeHugeFileExists();
+ try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+ assertTrue("mark is not supported", inputStream.markSupported());
+ }
+ }
+
+ /**
+ * Validates the implementation of InputStream.mark and reset
+ * for version 1 of the block blob input stream.
+ * @throws Exception
+ */
+ @Test
+ public void test_0303_MarkAndResetV1() throws Exception {
+ validateMarkAndReset(accountUsingInputStreamV1.getFileSystem());
+ }
+
+ /**
+ * Validates the implementation of InputStream.mark and reset
+ * for version 2 of the block blob input stream.
+ * @throws Exception
+ */
+ @Test
+ public void test_0304_MarkAndResetV2() throws Exception {
+ validateMarkAndReset(accountUsingInputStreamV2.getFileSystem());
+ }
+
+ private void validateMarkAndReset(FileSystem fs) throws Exception {
+ assumeHugeFileExists();
+ try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+ inputStream.mark(KILOBYTE - 1);
+
+ byte[] buffer = new byte[KILOBYTE];
+ int bytesRead = inputStream.read(buffer);
+ assertEquals(buffer.length, bytesRead);
+
+ inputStream.reset();
+ assertEquals("rest -> pos 0", 0, inputStream.getPos());
+
+ inputStream.mark(8 * KILOBYTE - 1);
+
+ buffer = new byte[8 * KILOBYTE];
+ bytesRead = inputStream.read(buffer);
+ assertEquals(buffer.length, bytesRead);
+
+ intercept(IOException.class,
+ "Resetting to invalid mark",
+ new Callable<FSDataInputStream>() {
+ @Override
+ public FSDataInputStream call() throws Exception {
+ inputStream.reset();
+ return inputStream;
+ }
+ }
+ );
+ }
+ }
+
+ /**
+ * Validates the implementation of Seekable.seekToNewSource, which should
+ * return false for version 1 of the block blob input stream.
+ * @throws IOException
+ */
+ @Test
+ public void test_0305_SeekToNewSourceV1() throws IOException {
+ validateSeekToNewSource(accountUsingInputStreamV1.getFileSystem());
+ }
+
+ /**
+ * Validates the implementation of Seekable.seekToNewSource, which should
+ * return false for version 2 of the block blob input stream.
+ * @throws IOException
+ */
+ @Test
+ public void test_0306_SeekToNewSourceV2() throws IOException {
+ validateSeekToNewSource(accountUsingInputStreamV2.getFileSystem());
+ }
+
+ private void validateSeekToNewSource(FileSystem fs) throws IOException {
+ assumeHugeFileExists();
+ try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+ assertFalse(inputStream.seekToNewSource(0));
+ }
+ }
+
+ /**
+ * Validates the implementation of InputStream.skip and ensures there is no
+ * network I/O for version 1 of the block blob input stream.
+ * @throws Exception
+ */
+ @Test
+ public void test_0307_SkipBoundsV1() throws Exception {
+ validateSkipBounds(accountUsingInputStreamV1.getFileSystem());
+ }
+
+ /**
+ * Validates the implementation of InputStream.skip and ensures there is no
+ * network I/O for version 2 of the block blob input stream.
+ * @throws Exception
+ */
+ @Test
+ public void test_0308_SkipBoundsV2() throws Exception {
+ validateSkipBounds(accountUsingInputStreamV2.getFileSystem());
+ }
+
+ private void validateSkipBounds(FileSystem fs) throws Exception {
+ assumeHugeFileExists();
+ try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+ NanoTimer timer = new NanoTimer();
+
+ long skipped = inputStream.skip(-1);
+ assertEquals(0, skipped);
+
+ skipped = inputStream.skip(0);
+ assertEquals(0, skipped);
+
+ assertTrue(testFileLength > 0);
+
+ skipped = inputStream.skip(testFileLength);
+ assertEquals(testFileLength, skipped);
+
+ intercept(EOFException.class,
+ new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ return inputStream.skip(1);
+ }
+ }
+ );
+ long elapsedTimeMs = timer.elapsedTimeMs();
+ assertTrue(
+ String.format(
+ "There should not be any network I/O (elapsedTimeMs=%1$d).",
+ elapsedTimeMs),
+ elapsedTimeMs < 20);
+ }
+ }
+
+ /**
+ * Validates the implementation of Seekable.seek and ensures there is no
+ * network I/O for forward seek.
+ * @throws Exception
+ */
+ @Test
+ public void test_0309_SeekBoundsV1() throws Exception {
+ validateSeekBounds(accountUsingInputStreamV1.getFileSystem());
+ }
+
+ /**
+ * Validates the implementation of Seekable.seek and ensures there is no
+ * network I/O for forward seek.
+ * @throws Exception
+ */
+ @Test
+ public void test_0310_SeekBoundsV2() throws Exception {
+ validateSeekBounds(accountUsingInputStreamV2.getFileSystem());
+ }
+
+ private void validateSeekBounds(FileSystem fs) throws Exception {
+ assumeHugeFileExists();
+ try (
+ FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
+ ) {
+ NanoTimer timer = new NanoTimer();
+
+ inputStream.seek(0);
+ assertEquals(0, inputStream.getPos());
+
+ intercept(EOFException.class,
+ FSExceptionMessages.NEGATIVE_SEEK,
+ new Callable<FSDataInputStream>() {
+ @Override
+ public FSDataInputStream call() throws Exception {
+ inputStream.seek(-1);
+ return inputStream;
+ }
+ }
+ );
+
+ assertTrue("Test file length only " + testFileLength, testFileLength > 0);
+ inputStream.seek(testFileLength);
+ assertEquals(testFileLength, inputStream.getPos());
+
+ intercept(EOFException.class,
+ FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
+ new Callable<FSDataInputStream>() {
+ @Override
+ public FSDataInputStream call() throws Exception {
+ inputStream.seek(testFileLength + 1);
+ return inputStream;
+ }
+ }
+ );
+
+ long elapsedTimeMs = timer.elapsedTimeMs();
+ assertTrue(
+ String.format(
+ "There should not be any network I/O (elapsedTimeMs=%1$d).",
+ elapsedTimeMs),
+ elapsedTimeMs < 20);
+ }
+ }
+
+ /**
+ * Validates the implementation of Seekable.seek, Seekable.getPos,
+ * and InputStream.available.
+ * @throws Exception
+ */
+ @Test
+ public void test_0311_SeekAndAvailableAndPositionV1() throws Exception {
+ validateSeekAndAvailableAndPosition(
+ accountUsingInputStreamV1.getFileSystem());
+ }
+
+ /**
+ * Validates the implementation of Seekable.seek, Seekable.getPos,
+ * and InputStream.available.
+ * @throws Exception
+ */
+ @Test
+ public void test_0312_SeekAndAvailableAndPositionV2() throws Exception {
+ validateSeekAndAvailableAndPosition(
+ accountUsingInputStreamV2.getFileSystem());
+ }
+
+ private void validateSeekAndAvailableAndPosition(FileSystem fs)
+ throws Exception {
+ assumeHugeFileExists();
+ try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+ byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
+ byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
+ byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
+ byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'};
+ byte[] buffer = new byte[3];
+
+ int bytesRead = inputStream.read(buffer);
+ assertEquals(buffer.length, bytesRead);
+ assertArrayEquals(expected1, buffer);
+ assertEquals(buffer.length, inputStream.getPos());
+ assertEquals(testFileLength - inputStream.getPos(),
+ inputStream.available());
+
+ bytesRead = inputStream.read(buffer);
+ assertEquals(buffer.length, bytesRead);
+ assertArrayEquals(expected2, buffer);
+ assertEquals(2 * buffer.length, inputStream.getPos());
+ assertEquals(testFileLength - inputStream.getPos(),
+ inputStream.available());
+
+ // reverse seek
+ int seekPos = 0;
+ inputStream.seek(seekPos);
+
+ bytesRead = inputStream.read(buffer);
+ assertEquals(buffer.length, bytesRead);
+ assertArrayEquals(expected1, buffer);
+ assertEquals(buffer.length + seekPos, inputStream.getPos());
+ assertEquals(testFileLength - inputStream.getPos(),
+ inputStream.available());
+
+ // reverse seek
+ seekPos = 1;
+ inputStream.seek(seekPos);
+
+ bytesRead = inputStream.read(buffer);
+ assertEquals(buffer.length, bytesRead);
+ assertArrayEquals(expected3, buffer);
+ assertEquals(buffer.length + seekPos, inputStream.getPos());
+ assertEquals(testFileLength - inputStream.getPos(),
+ inputStream.available());
+
+ // forward seek
+ seekPos = 6;
+ inputStream.seek(seekPos);
+
+ bytesRead = inputStream.read(buffer);
+ assertEquals(buffer.length, bytesRead);
+ assertArrayEquals(expected4, buffer);
+ assertEquals(buffer.length + seekPos, inputStream.getPos());
+ assertEquals(testFileLength - inputStream.getPos(),
+ inputStream.available());
+ }
+ }
+
+ /**
+ * Validates the implementation of InputStream.skip, Seekable.getPos,
+ * and InputStream.available.
+ * @throws IOException
+ */
+ @Test
+ public void test_0313_SkipAndAvailableAndPositionV1() throws IOException {
+ validateSkipAndAvailableAndPosition(
+ accountUsingInputStreamV1.getFileSystem());
+ }
+
+ /**
+ * Validates the implementation of InputStream.skip, Seekable.getPos,
+ * and InputStream.available.
+ * @throws IOException
+ */
+ @Test
+ public void test_0314_SkipAndAvailableAndPositionV2() throws IOException {
+ validateSkipAndAvailableAndPosition(
+ accountUsingInputStreamV1.getFileSystem());
+ }
+
+ private void validateSkipAndAvailableAndPosition(FileSystem fs)
+ throws IOException {
+ assumeHugeFileExists();
+ try (
+ FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
+ ) {
+ byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
+ byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
+ byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
+ byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'};
+
+ assertEquals(testFileLength, inputStream.available());
+ assertEquals(0, inputStream.getPos());
+
+ int n = 3;
+ long skipped = inputStream.skip(n);
+
+ assertEquals(skipped, inputStream.getPos());
+ assertEquals(testFileLength - inputStream.getPos(),
+ inputStream.available());
+ assertEquals(skipped, n);
+
+ byte[] buffer = new byte[3];
+ int bytesRead = inputStream.read(buffer);
+ assertEquals(buffer.length, bytesRead);
+ assertArrayEquals(expected2, buffer);
+ assertEquals(buffer.length + skipped, inputStream.getPos());
+ assertEquals(testFileLength - inputStream.getPos(),
+ inputStream.available());
+
+ // does skip still work after seek?
+ int seekPos = 1;
+ inputStream.seek(seekPos);
+
+ bytesRead = inputStream.read(buffer);
+ assertEquals(buffer.length, bytesRead);
+ assertArrayEquals(expected3, buffer);
+ assertEquals(buffer.length + seekPos, inputStream.getPos());
+ assertEquals(testFileLength - inputStream.getPos(),
+ inputStream.available());
+
+ long currentPosition = inputStream.getPos();
+ n = 2;
+ skipped = inputStream.skip(n);
+
+ assertEquals(currentPosition + skipped, inputStream.getPos());
+ assertEquals(testFileLength - inputStream.getPos(),
+ inputStream.available());
+ assertEquals(skipped, n);
+
+ bytesRead = inputStream.read(buffer);
+ assertEquals(buffer.length, bytesRead);
+ assertArrayEquals(expected4, buffer);
+ assertEquals(buffer.length + skipped + currentPosition,
+ inputStream.getPos());
+ assertEquals(testFileLength - inputStream.getPos(),
+ inputStream.available());
+ }
+ }
+
+ /**
+ * Ensures parity in the performance of sequential read for
+ * version 1 and version 2 of the block blob input stream.
+ * @throws IOException
+ */
+ @Test
+ public void test_0315_SequentialReadPerformance() throws IOException {
+ assumeHugeFileExists();
+ final int maxAttempts = 10;
+ final double maxAcceptableRatio = 1.01;
+ double v1ElapsedMs = 0, v2ElapsedMs = 0;
+ double ratio = Double.MAX_VALUE;
+ for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
+ v1ElapsedMs = sequentialRead(1,
+ accountUsingInputStreamV1.getFileSystem(), false);
+ v2ElapsedMs = sequentialRead(2,
+ accountUsingInputStreamV2.getFileSystem(), false);
+ ratio = v2ElapsedMs / v1ElapsedMs;
+ LOG.info(String.format(
+ "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f",
+ (long) v1ElapsedMs,
+ (long) v2ElapsedMs,
+ ratio));
+ }
+ assertTrue(String.format(
+ "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d,"
+ + " v2ElapsedMs=%2$d, ratio=%3$.2f",
+ (long) v1ElapsedMs,
+ (long) v2ElapsedMs,
+ ratio),
+ ratio < maxAcceptableRatio);
+ }
+
+ /**
+ * Ensures parity in the performance of sequential read after reverse seek for
+ * version 2 of the block blob input stream.
+ * @throws IOException
+ */
+ @Test
+ public void test_0316_SequentialReadAfterReverseSeekPerformanceV2()
+ throws IOException {
+ assumeHugeFileExists();
+ final int maxAttempts = 10;
+ final double maxAcceptableRatio = 1.01;
+ double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0;
+ double ratio = Double.MAX_VALUE;
+ for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
+ beforeSeekElapsedMs = sequentialRead(2,
+ accountUsingInputStreamV2.getFileSystem(), false);
+ afterSeekElapsedMs = sequentialRead(2,
+ accountUsingInputStreamV2.getFileSystem(), true);
+ ratio = afterSeekElapsedMs / beforeSeekElapsedMs;
+ LOG.info(String.format(
+ "beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d, ratio=%3$.2f",
+ (long) beforeSeekElapsedMs,
+ (long) afterSeekElapsedMs,
+ ratio));
+ }
+ assertTrue(String.format(
+ "Performance of version 2 after reverse seek is not acceptable:"
+ + " beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d,"
+ + " ratio=%3$.2f",
+ (long) beforeSeekElapsedMs,
+ (long) afterSeekElapsedMs,
+ ratio),
+ ratio < maxAcceptableRatio);
+ }
+
+ private long sequentialRead(int version,
+ FileSystem fs,
+ boolean afterReverseSeek) throws IOException {
+ byte[] buffer = new byte[16 * KILOBYTE];
+ long totalBytesRead = 0;
+ long bytesRead = 0;
+
+ try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+ if (afterReverseSeek) {
+ while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) {
+ bytesRead = inputStream.read(buffer);
+ totalBytesRead += bytesRead;
+ }
+ totalBytesRead = 0;
+ inputStream.seek(0);
+ }
+
+ NanoTimer timer = new NanoTimer();
+ while ((bytesRead = inputStream.read(buffer)) > 0) {
+ totalBytesRead += bytesRead;
+ }
+ long elapsedTimeMs = timer.elapsedTimeMs();
+
+ LOG.info(String.format(
+ "v%1$d: bytesRead=%2$d, elapsedMs=%3$d, Mbps=%4$.2f,"
+ + " afterReverseSeek=%5$s",
+ version,
+ totalBytesRead,
+ elapsedTimeMs,
+ toMbps(totalBytesRead, elapsedTimeMs),
+ afterReverseSeek));
+
+ assertEquals(testFileLength, totalBytesRead);
+ inputStream.close();
+ return elapsedTimeMs;
+ }
+ }
+
+ @Test
+ public void test_0317_RandomReadPerformance() throws IOException {
+ assumeHugeFileExists();
+ final int maxAttempts = 10;
+ final double maxAcceptableRatio = 0.10;
+ double v1ElapsedMs = 0, v2ElapsedMs = 0;
+ double ratio = Double.MAX_VALUE;
+ for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
+ v1ElapsedMs = randomRead(1,
+ accountUsingInputStreamV1.getFileSystem());
+ v2ElapsedMs = randomRead(2,
+ accountUsingInputStreamV2.getFileSystem());
+ ratio = v2ElapsedMs / v1ElapsedMs;
+ LOG.info(String.format(
+ "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f",
+ (long) v1ElapsedMs,
+ (long) v2ElapsedMs,
+ ratio));
+ }
+ assertTrue(String.format(
+ "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d,"
+ + " v2ElapsedMs=%2$d, ratio=%3$.2f",
+ (long) v1ElapsedMs,
+ (long) v2ElapsedMs,
+ ratio),
+ ratio < maxAcceptableRatio);
+ }
+
+ private long randomRead(int version, FileSystem fs) throws IOException {
+ assumeHugeFileExists();
+ final int minBytesToRead = 2 * MEGABYTE;
+ Random random = new Random();
+ byte[] buffer = new byte[8 * KILOBYTE];
+ long totalBytesRead = 0;
+ long bytesRead = 0;
+ try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+ NanoTimer timer = new NanoTimer();
+
+ do {
+ bytesRead = inputStream.read(buffer);
+ totalBytesRead += bytesRead;
+ inputStream.seek(random.nextInt(
+ (int) (testFileLength - buffer.length)));
+ } while (bytesRead > 0 && totalBytesRead < minBytesToRead);
+
+ long elapsedTimeMs = timer.elapsedTimeMs();
+
+ inputStream.close();
+
+ LOG.info(String.format(
+ "v%1$d: totalBytesRead=%2$d, elapsedTimeMs=%3$d, Mbps=%4$.2f",
+ version,
+ totalBytesRead,
+ elapsedTimeMs,
+ toMbps(totalBytesRead, elapsedTimeMs)));
+
+ assertTrue(minBytesToRead <= totalBytesRead);
+
+ return elapsedTimeMs;
+ }
+ }
+
+ @Test
+ public void test_999_DeleteHugeFiles() throws IOException {
+ try {
+ NanoTimer timer = new NanoTimer();
+ NativeAzureFileSystem fs = getFileSystem();
+ fs.delete(TEST_FILE_PATH, false);
+ timer.end("time to delete %s", TEST_FILE_PATH);
+ } finally {
+ // clean up the test account
+ AzureTestUtils.cleanupTestAccount(accountUsingInputStreamV1);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6b08f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestContainerChecks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestContainerChecks.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestContainerChecks.java
new file mode 100644
index 0000000..cc3baf5
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestContainerChecks.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.junit.Assume.assumeNotNull;
+
+import java.io.FileNotFoundException;
+import java.util.EnumSet;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.CreateOptions;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.microsoft.azure.storage.blob.BlobOutputStream;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+
+/**
+ * Tests that WASB creates containers only if needed.
+ */
+public class ITestContainerChecks extends AbstractWasbTestWithTimeout {
+ private AzureBlobStorageTestAccount testAccount;
+ private boolean runningInSASMode = false;
+
+ @After
+ public void tearDown() throws Exception {
+ testAccount = AzureTestUtils.cleanup(testAccount);
+ }
+
+ @Before
+ public void setMode() {
+ runningInSASMode = AzureBlobStorageTestAccount.createTestConfiguration().
+ getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE, false);
+ }
+
+ @Test
+ public void testContainerExistAfterDoesNotExist() throws Exception {
+ testAccount = blobStorageTestAccount();
+ assumeNotNull(testAccount);
+ CloudBlobContainer container = testAccount.getRealContainer();
+ FileSystem fs = testAccount.getFileSystem();
+
+ // Starting off with the container not there
+ assertFalse(container.exists());
+
+ // A list shouldn't create the container and will set file system store
+ // state to DoesNotExist
+ try {
+ fs.listStatus(new Path("/"));
+ assertTrue("Should've thrown.", false);
+ } catch (FileNotFoundException ex) {
+ assertTrue("Unexpected exception: " + ex,
+ ex.getMessage().contains("does not exist."));
+ }
+ assertFalse(container.exists());
+
+ // Create a container outside of the WASB FileSystem
+ container.create();
+ // Add a file to the container outside of the WASB FileSystem
+ CloudBlockBlob blob = testAccount.getBlobReference("foo");
+ BlobOutputStream outputStream = blob.openOutputStream();
+ outputStream.write(new byte[10]);
+ outputStream.close();
+
+ // Make sure the file is visible
+ assertTrue(fs.exists(new Path("/foo")));
+ assertTrue(container.exists());
+ }
+
+ protected AzureBlobStorageTestAccount blobStorageTestAccount()
+ throws Exception {
+ return AzureBlobStorageTestAccount.create("",
+ EnumSet.noneOf(CreateOptions.class));
+ }
+
+ @Test
+ public void testContainerCreateAfterDoesNotExist() throws Exception {
+ testAccount = blobStorageTestAccount();
+ assumeNotNull(testAccount);
+ CloudBlobContainer container = testAccount.getRealContainer();
+ FileSystem fs = testAccount.getFileSystem();
+
+ // Starting off with the container not there
+ assertFalse(container.exists());
+
+ // A list shouldn't create the container and will set file system store
+ // state to DoesNotExist
+ try {
+ assertNull(fs.listStatus(new Path("/")));
+ assertTrue("Should've thrown.", false);
+ } catch (FileNotFoundException ex) {
+ assertTrue("Unexpected exception: " + ex,
+ ex.getMessage().contains("does not exist."));
+ }
+ assertFalse(container.exists());
+
+ // Create a container outside of the WASB FileSystem
+ container.create();
+
+ // Write should succeed
+ assertTrue(fs.createNewFile(new Path("/foo")));
+ assertTrue(container.exists());
+ }
+
+ @Test
+ public void testContainerCreateOnWrite() throws Exception {
+ testAccount = blobStorageTestAccount();
+ assumeNotNull(testAccount);
+ CloudBlobContainer container = testAccount.getRealContainer();
+ FileSystem fs = testAccount.getFileSystem();
+
+ // Starting off with the container not there
+ assertFalse(container.exists());
+
+ // A list shouldn't create the container.
+ try {
+ fs.listStatus(new Path("/"));
+ assertTrue("Should've thrown.", false);
+ } catch (FileNotFoundException ex) {
+ assertTrue("Unexpected exception: " + ex,
+ ex.getMessage().contains("does not exist."));
+ }
+ assertFalse(container.exists());
+
+ // Neither should a read.
+ Path foo = new Path("/testContainerCreateOnWrite-foo");
+ Path bar = new Path("/testContainerCreateOnWrite-bar");
+ LambdaTestUtils.intercept(FileNotFoundException.class,
+ new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ fs.open(foo).close();
+ return "Stream to " + foo;
+ }
+ }
+ );
+ assertFalse(container.exists());
+
+ // Neither should a rename
+ assertFalse(fs.rename(foo, bar));
+ assertFalse(container.exists());
+
+ // But a write should.
+ assertTrue(fs.createNewFile(foo));
+ assertTrue(container.exists());
+ }
+
+ @Test
+ public void testContainerChecksWithSas() throws Exception {
+
+ Assume.assumeFalse(runningInSASMode);
+ testAccount = AzureBlobStorageTestAccount.create("",
+ EnumSet.of(CreateOptions.UseSas));
+ assumeNotNull(testAccount);
+ CloudBlobContainer container = testAccount.getRealContainer();
+ FileSystem fs = testAccount.getFileSystem();
+
+ // The container shouldn't be there
+ assertFalse(container.exists());
+
+ // A write should just fail
+ try {
+ fs.createNewFile(new Path("/testContainerChecksWithSas-foo"));
+ assertFalse("Should've thrown.", true);
+ } catch (AzureException ex) {
+ }
+ assertFalse(container.exists());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6b08f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java
new file mode 100644
index 0000000..a45dae4
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.FileNotFoundException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.azure.ExceptionHandlingTestHelper.*;
+
+/**
+ * Single threaded exception handling.
+ */
+public class ITestFileSystemOperationExceptionHandling
+ extends AbstractWasbTestBase {
+
+ private FSDataInputStream inputStream = null;
+
+ private Path testPath;
+ private Path testFolderPath;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ testPath = path("testfile.dat");
+ testFolderPath = path("testfolder");
+ }
+
+ /**
+ * Helper method that creates a InputStream to validate exceptions
+ * for various scenarios.
+ */
+ private void setupInputStreamToTest(AzureBlobStorageTestAccount testAccount)
+ throws Exception {
+
+ FileSystem fs = testAccount.getFileSystem();
+
+ // Step 1: Create a file and write dummy data.
+ Path base = methodPath();
+ Path testFilePath1 = new Path(base, "test1.dat");
+ Path testFilePath2 = new Path(base, "test2.dat");
+ FSDataOutputStream outputStream = fs.create(testFilePath1);
+ String testString = "This is a test string";
+ outputStream.write(testString.getBytes());
+ outputStream.close();
+
+ // Step 2: Open a read stream on the file.
+ inputStream = fs.open(testFilePath1);
+
+ // Step 3: Rename the file
+ fs.rename(testFilePath1, testFilePath2);
+ }
+
+ /**
+ * Tests a basic single threaded read scenario for Page blobs.
+ */
+ @Test(expected=FileNotFoundException.class)
+ public void testSingleThreadedPageBlobReadScenario() throws Throwable {
+ AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
+ setupInputStreamToTest(testAccount);
+ byte[] readBuffer = new byte[512];
+ inputStream.read(readBuffer);
+ }
+
+ /**
+ * Tests a basic single threaded seek scenario for Page blobs.
+ */
+ @Test(expected=FileNotFoundException.class)
+ public void testSingleThreadedPageBlobSeekScenario() throws Throwable {
+ AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
+ setupInputStreamToTest(testAccount);
+ inputStream.seek(5);
+ }
+
+ /**
+ * Test a basic single thread seek scenario for Block blobs.
+ */
+ @Test(expected=FileNotFoundException.class)
+ public void testSingleThreadBlockBlobSeekScenario() throws Throwable {
+
+ AzureBlobStorageTestAccount testAccount = createTestAccount();
+ setupInputStreamToTest(testAccount);
+ inputStream.seek(5);
+ inputStream.read();
+ }
+
+ /**
+ * Tests a basic single threaded read scenario for Block blobs.
+ */
+ @Test(expected=FileNotFoundException.class)
+ public void testSingledThreadBlockBlobReadScenario() throws Throwable{
+ AzureBlobStorageTestAccount testAccount = createTestAccount();
+ setupInputStreamToTest(testAccount);
+ byte[] readBuffer = new byte[512];
+ inputStream.read(readBuffer);
+ }
+
+ /**
+ * Tests basic single threaded setPermission scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testSingleThreadedBlockBlobSetPermissionScenario() throws Throwable {
+
+ createEmptyFile(createTestAccount(), testPath);
+ fs.delete(testPath, true);
+ fs.setPermission(testPath,
+ new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
+ }
+
+ /**
+ * Tests basic single threaded setPermission scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testSingleThreadedPageBlobSetPermissionScenario()
+ throws Throwable {
+ createEmptyFile(getPageBlobTestStorageAccount(), testPath);
+ fs.delete(testPath, true);
+ fs.setOwner(testPath, "testowner", "testgroup");
+ }
+
+ /**
+ * Tests basic single threaded setPermission scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testSingleThreadedBlockBlobSetOwnerScenario() throws Throwable {
+
+ createEmptyFile(createTestAccount(), testPath);
+ fs.delete(testPath, true);
+ fs.setOwner(testPath, "testowner", "testgroup");
+ }
+
+ /**
+ * Tests basic single threaded setPermission scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testSingleThreadedPageBlobSetOwnerScenario() throws Throwable {
+ createEmptyFile(getPageBlobTestStorageAccount(),
+ testPath);
+ fs.delete(testPath, true);
+ fs.setPermission(testPath,
+ new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
+ }
+
+ /**
+ * Test basic single threaded listStatus scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testSingleThreadedBlockBlobListStatusScenario() throws Throwable {
+ createTestFolder(createTestAccount(),
+ testFolderPath);
+ fs.delete(testFolderPath, true);
+ fs.listStatus(testFolderPath);
+ }
+
+ /**
+ * Test basic single threaded listStatus scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testSingleThreadedPageBlobListStatusScenario() throws Throwable {
+ createTestFolder(getPageBlobTestStorageAccount(),
+ testFolderPath);
+ fs.delete(testFolderPath, true);
+ fs.listStatus(testFolderPath);
+ }
+
+ /**
+ * Test basic single threaded listStatus scenario.
+ */
+ @Test
+ public void testSingleThreadedBlockBlobRenameScenario() throws Throwable {
+
+ createEmptyFile(createTestAccount(),
+ testPath);
+ Path dstPath = new Path("dstFile.dat");
+ fs.delete(testPath, true);
+ boolean renameResult = fs.rename(testPath, dstPath);
+ assertFalse(renameResult);
+ }
+
+ /**
+ * Test basic single threaded listStatus scenario.
+ */
+ @Test
+ public void testSingleThreadedPageBlobRenameScenario() throws Throwable {
+
+ createEmptyFile(getPageBlobTestStorageAccount(),
+ testPath);
+ Path dstPath = new Path("dstFile.dat");
+ fs.delete(testPath, true);
+ boolean renameResult = fs.rename(testPath, dstPath);
+ assertFalse(renameResult);
+ }
+
+ /**
+ * Test basic single threaded listStatus scenario.
+ */
+ @Test
+ public void testSingleThreadedBlockBlobDeleteScenario() throws Throwable {
+
+ createEmptyFile(createTestAccount(),
+ testPath);
+ fs.delete(testPath, true);
+ boolean deleteResult = fs.delete(testPath, true);
+ assertFalse(deleteResult);
+ }
+
+ /**
+ * Test basic single threaded listStatus scenario.
+ */
+ @Test
+ public void testSingleThreadedPageBlobDeleteScenario() throws Throwable {
+
+ createEmptyFile(getPageBlobTestStorageAccount(),
+ testPath);
+ fs.delete(testPath, true);
+ boolean deleteResult = fs.delete(testPath, true);
+ assertFalse(deleteResult);
+ }
+
+ /**
+ * Test basic single threaded listStatus scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testSingleThreadedBlockBlobOpenScenario() throws Throwable {
+
+ createEmptyFile(createTestAccount(),
+ testPath);
+ fs.delete(testPath, true);
+ inputStream = fs.open(testPath);
+ }
+
+ /**
+ * Test delete then open a file.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testSingleThreadedPageBlobOpenScenario() throws Throwable {
+
+ createEmptyFile(getPageBlobTestStorageAccount(),
+ testPath);
+ fs.delete(testPath, true);
+ inputStream = fs.open(testPath);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+
+ ContractTestUtils.rm(fs, testPath, true, true);
+ super.tearDown();
+ }
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount()
+ throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6b08f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionMessage.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionMessage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionMessage.java
new file mode 100644
index 0000000..6d5e72e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionMessage.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.net.URI;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.NO_ACCESS_TO_CONTAINER_MSG;
+
+/**
+ * Test for error messages coming from SDK.
+ */
+public class ITestFileSystemOperationExceptionMessage
+ extends AbstractWasbTestWithTimeout {
+
+
+
+ @Test
+ public void testAnonymouseCredentialExceptionMessage() throws Throwable {
+
+ Configuration conf = AzureBlobStorageTestAccount.createTestConfiguration();
+ CloudStorageAccount account =
+ AzureBlobStorageTestAccount.createTestAccount(conf);
+ AzureTestUtils.assume("No test account", account != null);
+
+ String testStorageAccount = conf.get("fs.azure.test.account.name");
+ conf = new Configuration();
+ conf.set("fs.AbstractFileSystem.wasb.impl",
+ "org.apache.hadoop.fs.azure.Wasb");
+ conf.set("fs.azure.skip.metrics", "true");
+
+ String testContainer = UUID.randomUUID().toString();
+ String wasbUri = String.format("wasb://%s@%s",
+ testContainer, testStorageAccount);
+
+ try(NativeAzureFileSystem filesystem = new NativeAzureFileSystem()) {
+ filesystem.initialize(new URI(wasbUri), conf);
+ fail("Expected an exception, got " + filesystem);
+ } catch (Exception ex) {
+
+ Throwable innerException = ex.getCause();
+ while (innerException != null
+ && !(innerException instanceof AzureException)) {
+ innerException = innerException.getCause();
+ }
+
+ if (innerException != null) {
+ GenericTestUtils.assertExceptionContains(String.format(
+ NO_ACCESS_TO_CONTAINER_MSG, testStorageAccount, testContainer),
+ ex);
+ } else {
+ fail("No inner azure exception");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6b08f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java
new file mode 100644
index 0000000..175a9ec
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.FileNotFoundException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+
+import static org.apache.hadoop.fs.azure.ExceptionHandlingTestHelper.*;
+
+/**
+ * Multithreaded operations on FS, verify failures are as expected.
+ */
+public class ITestFileSystemOperationsExceptionHandlingMultiThreaded
+ extends AbstractWasbTestBase {
+
+ FSDataInputStream inputStream = null;
+
+ private Path testPath;
+ private Path testFolderPath;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ testPath = path("testfile.dat");
+ testFolderPath = path("testfolder");
+ }
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+
+ IOUtils.closeStream(inputStream);
+ ContractTestUtils.rm(fs, testPath, true, false);
+ ContractTestUtils.rm(fs, testFolderPath, true, false);
+ super.tearDown();
+ }
+
+ /**
+ * Helper method to creates an input stream to test various scenarios.
+ */
+ private void getInputStreamToTest(FileSystem fs, Path testPath)
+ throws Throwable {
+
+ FSDataOutputStream outputStream = fs.create(testPath);
+ String testString = "This is a test string";
+ outputStream.write(testString.getBytes());
+ outputStream.close();
+
+ inputStream = fs.open(testPath);
+ }
+
+ /**
+ * Test to validate correct exception is thrown for Multithreaded read
+ * scenario for block blobs.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testMultiThreadedBlockBlobReadScenario() throws Throwable {
+
+ AzureBlobStorageTestAccount testAccount = createTestAccount();
+ NativeAzureFileSystem fs = testAccount.getFileSystem();
+ Path base = methodPath();
+ Path testFilePath1 = new Path(base, "test1.dat");
+ Path renamePath = new Path(base, "test2.dat");
+ getInputStreamToTest(fs, testFilePath1);
+ Thread renameThread = new Thread(
+ new RenameThread(fs, testFilePath1, renamePath));
+ renameThread.start();
+
+ renameThread.join();
+
+ byte[] readBuffer = new byte[512];
+ inputStream.read(readBuffer);
+ }
+
+ /**
+ * Test to validate correct exception is thrown for Multithreaded seek
+ * scenario for block blobs.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testMultiThreadBlockBlobSeekScenario() throws Throwable {
+
+/*
+ AzureBlobStorageTestAccount testAccount = createTestAccount();
+ fs = testAccount.getFileSystem();
+*/
+ Path base = methodPath();
+ Path testFilePath1 = new Path(base, "test1.dat");
+ Path renamePath = new Path(base, "test2.dat");
+
+ getInputStreamToTest(fs, testFilePath1);
+ Thread renameThread = new Thread(
+ new RenameThread(fs, testFilePath1, renamePath));
+ renameThread.start();
+
+ renameThread.join();
+
+ inputStream.seek(5);
+ inputStream.read();
+ }
+
+ /**
+ * Tests basic multi threaded setPermission scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testMultiThreadedPageBlobSetPermissionScenario()
+ throws Throwable {
+ createEmptyFile(
+ getPageBlobTestStorageAccount(),
+ testPath);
+ Thread t = new Thread(new DeleteThread(fs, testPath));
+ t.start();
+ while (t.isAlive()) {
+ fs.setPermission(testPath,
+ new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
+ }
+ fs.setPermission(testPath,
+ new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
+ }
+
+ /**
+ * Tests basic multi threaded setPermission scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testMultiThreadedBlockBlobSetPermissionScenario()
+ throws Throwable {
+ createEmptyFile(createTestAccount(),
+ testPath);
+ Thread t = new Thread(new DeleteThread(fs, testPath));
+ t.start();
+ while (t.isAlive()) {
+ fs.setPermission(testPath,
+ new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
+ }
+ fs.setPermission(testPath,
+ new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
+ }
+
+ /**
+ * Tests basic multi threaded setPermission scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testMultiThreadedPageBlobOpenScenario() throws Throwable {
+
+ createEmptyFile(createTestAccount(),
+ testPath);
+ Thread t = new Thread(new DeleteThread(fs, testPath));
+ t.start();
+ while (t.isAlive()) {
+ inputStream = fs.open(testPath);
+ inputStream.close();
+ }
+
+ inputStream = fs.open(testPath);
+ inputStream.close();
+ }
+
+ /**
+ * Tests basic multi threaded setPermission scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testMultiThreadedBlockBlobOpenScenario() throws Throwable {
+
+ createEmptyFile(
+ getPageBlobTestStorageAccount(),
+ testPath);
+ Thread t = new Thread(new DeleteThread(fs, testPath));
+ t.start();
+
+ while (t.isAlive()) {
+ inputStream = fs.open(testPath);
+ inputStream.close();
+ }
+ inputStream = fs.open(testPath);
+ inputStream.close();
+ }
+
+ /**
+ * Tests basic multi threaded setOwner scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testMultiThreadedBlockBlobSetOwnerScenario() throws Throwable {
+
+ createEmptyFile(createTestAccount(), testPath);
+ Thread t = new Thread(new DeleteThread(fs, testPath));
+ t.start();
+ while (t.isAlive()) {
+ fs.setOwner(testPath, "testowner", "testgroup");
+ }
+ fs.setOwner(testPath, "testowner", "testgroup");
+ }
+
+ /**
+ * Tests basic multi threaded setOwner scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testMultiThreadedPageBlobSetOwnerScenario() throws Throwable {
+ createEmptyFile(
+ getPageBlobTestStorageAccount(),
+ testPath);
+ Thread t = new Thread(new DeleteThread(fs, testPath));
+ t.start();
+ while (t.isAlive()) {
+ fs.setOwner(testPath, "testowner", "testgroup");
+ }
+ fs.setOwner(testPath, "testowner", "testgroup");
+ }
+
+ /**
+ * Tests basic multi threaded listStatus scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testMultiThreadedBlockBlobListStatusScenario() throws Throwable {
+
+ createTestFolder(createTestAccount(),
+ testFolderPath);
+ Thread t = new Thread(new DeleteThread(fs, testFolderPath));
+ t.start();
+ while (t.isAlive()) {
+ fs.listStatus(testFolderPath);
+ }
+ fs.listStatus(testFolderPath);
+ }
+
+ /**
+ * Tests basic multi threaded listStatus scenario.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testMultiThreadedPageBlobListStatusScenario() throws Throwable {
+
+ createTestFolder(
+ getPageBlobTestStorageAccount(),
+ testFolderPath);
+ Thread t = new Thread(new DeleteThread(fs, testFolderPath));
+ t.start();
+ while (t.isAlive()) {
+ fs.listStatus(testFolderPath);
+ }
+ fs.listStatus(testFolderPath);
+ }
+
+ /**
+ * Test to validate correct exception is thrown for Multithreaded read
+ * scenario for page blobs.
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testMultiThreadedPageBlobReadScenario() throws Throwable {
+
+ bindToTestAccount(getPageBlobTestStorageAccount());
+ Path base = methodPath();
+ Path testFilePath1 = new Path(base, "test1.dat");
+ Path renamePath = new Path(base, "test2.dat");
+
+ getInputStreamToTest(fs, testFilePath1);
+ Thread renameThread = new Thread(
+ new RenameThread(fs, testFilePath1, renamePath));
+ renameThread.start();
+
+ renameThread.join();
+ byte[] readBuffer = new byte[512];
+ inputStream.read(readBuffer);
+ }
+
+ /**
+ * Test to validate correct exception is thrown for Multithreaded seek
+ * scenario for page blobs.
+ */
+
+ @Test(expected = FileNotFoundException.class)
+ public void testMultiThreadedPageBlobSeekScenario() throws Throwable {
+
+ bindToTestAccount(getPageBlobTestStorageAccount());
+
+ Path base = methodPath();
+ Path testFilePath1 = new Path(base, "test1.dat");
+ Path renamePath = new Path(base, "test2.dat");
+
+ getInputStreamToTest(fs, testFilePath1);
+ Thread renameThread = new Thread(
+ new RenameThread(fs, testFilePath1, renamePath));
+ renameThread.start();
+
+ renameThread.join();
+ inputStream.seek(5);
+ }
+
+
+ /**
+ * Helper thread that just renames the test file.
+ */
+ private static class RenameThread implements Runnable {
+
+ private final FileSystem fs;
+ private final Path testPath;
+ private final Path renamePath;
+
+ RenameThread(FileSystem fs,
+ Path testPath,
+ Path renamePath) {
+ this.fs = fs;
+ this.testPath = testPath;
+ this.renamePath = renamePath;
+ }
+
+ @Override
+ public void run() {
+ try {
+ fs.rename(testPath, renamePath);
+ } catch (Exception e) {
+ // Swallowing the exception as the
+ // correctness of the test is controlled
+ // by the other thread
+ }
+ }
+ }
+
+ private static class DeleteThread implements Runnable {
+ private final FileSystem fs;
+ private final Path testPath;
+
+ DeleteThread(FileSystem fs, Path testPath) {
+ this.fs = fs;
+ this.testPath = testPath;
+ }
+
+ @Override
+ public void run() {
+ try {
+ fs.delete(testPath, true);
+ } catch (Exception e) {
+ // Swallowing the exception as the
+ // correctness of the test is controlled
+ // by the other thread
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org