You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2014/10/08 23:24:39 UTC
[2/5] HADOOP-10809. hadoop-azure: page blob support. Contributed by
Dexter Bradshaw, Mostafa Elhemali, Eric Hanson, and Mike Liddell.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
index b03997c..047ea1b 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
@@ -33,7 +33,7 @@ import java.util.TimeZone;
import org.apache.commons.httpclient.URIException;
import org.apache.commons.httpclient.util.URIUtil;
-import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.lang.NotImplementedException;
import com.microsoft.windowsazure.storage.CloudStorageAccount;
import com.microsoft.windowsazure.storage.OperationContext;
@@ -44,10 +44,15 @@ import com.microsoft.windowsazure.storage.StorageUri;
import com.microsoft.windowsazure.storage.blob.BlobListingDetails;
import com.microsoft.windowsazure.storage.blob.BlobProperties;
import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
import com.microsoft.windowsazure.storage.blob.CloudBlobContainer;
import com.microsoft.windowsazure.storage.blob.CloudBlobDirectory;
import com.microsoft.windowsazure.storage.blob.CopyState;
import com.microsoft.windowsazure.storage.blob.ListBlobItem;
+import com.microsoft.windowsazure.storage.blob.PageRange;
+
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriBuilderException;
/**
* A mock implementation of the Azure Storage interaction layer for unit tests.
@@ -55,7 +60,8 @@ import com.microsoft.windowsazure.storage.blob.ListBlobItem;
*/
public class MockStorageInterface extends StorageInterface {
private InMemoryBlockBlobStore backingStore;
- private final ArrayList<PreExistingContainer> preExistingContainers = new ArrayList<MockStorageInterface.PreExistingContainer>();
+ private final ArrayList<PreExistingContainer> preExistingContainers =
+ new ArrayList<MockStorageInterface.PreExistingContainer>();
private String baseUriString;
public InMemoryBlockBlobStore getBackingStore() {
@@ -107,6 +113,33 @@ public class MockStorageInterface extends StorageInterface {
return null;
}
+ /**
+ * Utility function used to convert a given URI to a decoded string
+ * representation sent to the backing store. URIs coming as input
+ * to this class will be encoded by the URI class, and we want
+ * the underlying storage to store keys in their original UTF-8 form.
+ */
+ private static String convertUriToDecodedString(URI uri) {
+ try {
+ String result = URIUtil.decode(uri.toString());
+ return result;
+ } catch (URIException e) {
+ throw new AssertionError("Failed to decode URI: " + uri.toString());
+ }
+ }
+
+ private static URI convertKeyToEncodedUri(String key) {
+ try {
+ String encodedKey = URIUtil.encodePath(key);
+ URI uri = new URI(encodedKey);
+ return uri;
+ } catch (URISyntaxException e) {
+ throw new AssertionError("Failed to encode key: " + key);
+ } catch (URIException e) {
+ throw new AssertionError("Failed to encode key: " + key);
+ }
+ }
+
@Override
public CloudBlobContainerWrapper getContainerReference(String name)
throws URISyntaxException, StorageException {
@@ -196,6 +229,12 @@ public class MockStorageInterface extends StorageInterface {
false)), null, 0);
}
+ @Override
+ public CloudPageBlobWrapper getPageBlobReference(String blobAddressUri)
+ throws URISyntaxException, StorageException {
+ return new MockCloudPageBlobWrapper(new URI(blobAddressUri), null, 0);
+ }
+
// helper to create full URIs for directory and blob.
// use withTrailingSlash=true to get a good path for a directory.
private String fullUriString(String relativePath, boolean withTrailingSlash) {
@@ -260,24 +299,41 @@ public class MockStorageInterface extends StorageInterface {
BlobRequestOptions options, OperationContext opContext)
throws URISyntaxException, StorageException {
ArrayList<ListBlobItem> ret = new ArrayList<ListBlobItem>();
- String fullPrefix = prefix == null ? uri.toString() : new URI(
- uri.getScheme(), uri.getAuthority(), uri.getPath() + prefix,
- uri.getQuery(), uri.getFragment()).toString();
- boolean includeMetadata = listingDetails
- .contains(BlobListingDetails.METADATA);
+ URI searchUri = null;
+ if (prefix == null) {
+ searchUri = uri;
+ } else {
+ try {
+ searchUri = UriBuilder.fromUri(uri).path(prefix).build();
+ } catch (UriBuilderException e) {
+ throw new AssertionError("Failed to encode path: " + prefix);
+ }
+ }
+
+ String fullPrefix = convertUriToDecodedString(searchUri);
+ boolean includeMetadata = listingDetails.contains(BlobListingDetails.METADATA);
HashSet<String> addedDirectories = new HashSet<String>();
- for (InMemoryBlockBlobStore.ListBlobEntry current : backingStore
- .listBlobs(fullPrefix, includeMetadata)) {
+ for (InMemoryBlockBlobStore.ListBlobEntry current : backingStore.listBlobs(
+ fullPrefix, includeMetadata)) {
int indexOfSlash = current.getKey().indexOf('/', fullPrefix.length());
if (useFlatBlobListing || indexOfSlash < 0) {
- ret.add(new MockCloudBlockBlobWrapper(new URI(current.getKey()),
- current.getMetadata(), current.getContentLength()));
+ if (current.isPageBlob()) {
+ ret.add(new MockCloudPageBlobWrapper(
+ convertKeyToEncodedUri(current.getKey()),
+ current.getMetadata(),
+ current.getContentLength()));
+ } else {
+ ret.add(new MockCloudBlockBlobWrapper(
+ convertKeyToEncodedUri(current.getKey()),
+ current.getMetadata(),
+ current.getContentLength()));
+ }
} else {
String directoryName = current.getKey().substring(0, indexOfSlash);
if (!addedDirectories.contains(directoryName)) {
addedDirectories.add(current.getKey());
- ret.add(new MockCloudBlobDirectoryWrapper(new URI(directoryName
- + "/")));
+ ret.add(new MockCloudBlobDirectoryWrapper(new URI(
+ directoryName + "/")));
}
}
}
@@ -286,35 +342,35 @@ public class MockStorageInterface extends StorageInterface {
@Override
public StorageUri getStorageUri() {
- throw new UnsupportedOperationException();
+ throw new NotImplementedException();
}
-
}
- class MockCloudBlockBlobWrapper extends CloudBlockBlobWrapper {
- private URI uri;
- private HashMap<String, String> metadata = new HashMap<String, String>();
- private BlobProperties properties;
+ abstract class MockCloudBlobWrapper implements CloudBlobWrapper {
+ protected final URI uri;
+ protected HashMap<String, String> metadata =
+ new HashMap<String, String>();
+ protected BlobProperties properties;
- public MockCloudBlockBlobWrapper(URI uri, HashMap<String, String> metadata,
+ protected MockCloudBlobWrapper(URI uri, HashMap<String, String> metadata,
int length) {
this.uri = uri;
this.metadata = metadata;
this.properties = new BlobProperties();
this.properties.setLength(length);
- this.properties.setLastModified(Calendar.getInstance(
- TimeZone.getTimeZone("UTC")).getTime());
+ this.properties.setLastModified(
+ Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTime());
}
- private void refreshProperties(boolean getMetadata) {
- if (backingStore.exists(uri.toString())) {
- byte[] content = backingStore.getContent(uri.toString());
+ protected void refreshProperties(boolean getMetadata) {
+ if (backingStore.exists(convertUriToDecodedString(uri))) {
+ byte[] content = backingStore.getContent(convertUriToDecodedString(uri));
properties = new BlobProperties();
properties.setLength(content.length);
- properties.setLastModified(Calendar.getInstance(
- TimeZone.getTimeZone("UTC")).getTime());
+ properties.setLastModified(
+ Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTime());
if (getMetadata) {
- metadata = backingStore.getMetadata(uri.toString());
+ metadata = backingStore.getMetadata(convertUriToDecodedString(uri));
}
}
}
@@ -347,26 +403,27 @@ public class MockStorageInterface extends StorageInterface {
}
@Override
- public void startCopyFromBlob(CloudBlockBlobWrapper sourceBlob,
+ public void startCopyFromBlob(URI source,
OperationContext opContext) throws StorageException, URISyntaxException {
- backingStore.copy(sourceBlob.getUri().toString(), uri.toString());
- // it would be best if backingStore.properties.CopyState were tracked
- // If implemented, update azureNativeFileSystemStore.waitForCopyToComplete
+ backingStore.copy(convertUriToDecodedString(source), convertUriToDecodedString(uri));
+ //TODO: set the backingStore.properties.CopyState and
+ // update azureNativeFileSystemStore.waitForCopyToComplete
}
@Override
public CopyState getCopyState() {
- return this.properties.getCopyState();
+ return this.properties.getCopyState();
}
@Override
- public void delete(OperationContext opContext) throws StorageException {
- backingStore.delete(uri.toString());
+ public void delete(OperationContext opContext, SelfRenewingLease lease)
+ throws StorageException {
+ backingStore.delete(convertUriToDecodedString(uri));
}
@Override
public boolean exists(OperationContext opContext) throws StorageException {
- return backingStore.exists(uri.toString());
+ return backingStore.exists(convertUriToDecodedString(uri));
}
@Override
@@ -383,37 +440,90 @@ public class MockStorageInterface extends StorageInterface {
@Override
public InputStream openInputStream(BlobRequestOptions options,
OperationContext opContext) throws StorageException {
- return new ByteArrayInputStream(backingStore.getContent(uri.toString()));
+ return new ByteArrayInputStream(
+ backingStore.getContent(convertUriToDecodedString(uri)));
+ }
+
+ @Override
+ public void uploadMetadata(OperationContext opContext)
+ throws StorageException {
+ backingStore.setMetadata(convertUriToDecodedString(uri), metadata);
+ }
+
+ @Override
+ public void downloadRange(long offset, long length, OutputStream os,
+ BlobRequestOptions options, OperationContext opContext)
+ throws StorageException {
+ throw new NotImplementedException();
+ }
+ }
+
+ class MockCloudBlockBlobWrapper extends MockCloudBlobWrapper
+ implements CloudBlockBlobWrapper {
+ public MockCloudBlockBlobWrapper(URI uri, HashMap<String, String> metadata,
+ int length) {
+ super(uri, metadata, length);
}
@Override
public OutputStream openOutputStream(BlobRequestOptions options,
OperationContext opContext) throws StorageException {
- return backingStore.upload(uri.toString(), metadata);
+ return backingStore.uploadBlockBlob(convertUriToDecodedString(uri),
+ metadata);
}
@Override
- public void upload(InputStream sourceStream, OperationContext opContext)
- throws StorageException, IOException {
- ByteArrayOutputStream allContent = new ByteArrayOutputStream();
- allContent.write(sourceStream);
- backingStore.setContent(uri.toString(), allContent.toByteArray(),
- metadata);
- refreshProperties(false);
- allContent.close();
+ public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
}
@Override
- public void uploadMetadata(OperationContext opContext)
- throws StorageException {
- backingStore.setContent(uri.toString(),
- backingStore.getContent(uri.toString()), metadata);
+ public void setWriteBlockSizeInBytes(int writeBlockSizeBytes) {
}
@Override
- public void uploadProperties(OperationContext opContext)
- throws StorageException {
- refreshProperties(false);
+ public StorageUri getStorageUri() {
+ return null;
+ }
+
+ @Override
+ public void uploadProperties(OperationContext context, SelfRenewingLease lease) {
+ }
+
+ @Override
+ public SelfRenewingLease acquireLease() {
+ return null;
+ }
+
+ @Override
+ public CloudBlob getBlob() {
+ return null;
+ }
+ }
+
+ class MockCloudPageBlobWrapper extends MockCloudBlobWrapper
+ implements CloudPageBlobWrapper {
+ public MockCloudPageBlobWrapper(URI uri, HashMap<String, String> metadata,
+ int length) {
+ super(uri, metadata, length);
+ }
+
+ @Override
+ public void create(long length, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void uploadPages(InputStream sourceStream, long offset, long length,
+ BlobRequestOptions options, OperationContext opContext)
+ throws StorageException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ArrayList<PageRange> downloadPageRanges(BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ throw new NotImplementedException();
}
@Override
@@ -426,8 +536,23 @@ public class MockStorageInterface extends StorageInterface {
@Override
public StorageUri getStorageUri() {
- throw new UnsupportedOperationException();
+ throw new NotImplementedException();
}
+ @Override
+ public void uploadProperties(OperationContext opContext,
+ SelfRenewingLease lease)
+ throws StorageException {
+ }
+
+ @Override
+ public SelfRenewingLease acquireLease() {
+ return null;
+ }
+
+ @Override
+ public CloudBlob getBlob() {
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
index e731b21..01cf713 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.junit.Assume.assumeNotNull;
import java.io.BufferedReader;
@@ -32,10 +33,15 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -49,6 +55,13 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.apache.hadoop.fs.azure.AzureException;
+import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
+
+import com.microsoft.windowsazure.storage.AccessCondition;
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
+
/*
* Tests the Native Azure file system (WASB) against an actual blob store if
* provided in the environment.
@@ -59,12 +72,13 @@ import org.junit.Test;
*/
public abstract class NativeAzureFileSystemBaseTest {
- private FileSystem fs;
+ protected FileSystem fs;
private AzureBlobStorageTestAccount testAccount;
private final long modifiedTimeErrorMargin = 5 * 1000; // Give it +/-5 seconds
- protected abstract AzureBlobStorageTestAccount createTestAccount()
- throws Exception;
+ protected abstract AzureBlobStorageTestAccount createTestAccount() throws Exception;
+
+ public static final Log LOG = LogFactory.getLog(NativeAzureFileSystemBaseTest.class);
@Before
public void setUp() throws Exception {
@@ -140,7 +154,7 @@ public abstract class NativeAzureFileSystemBaseTest {
private void testOwnership(Path pathUnderTest) throws IOException {
FileStatus ret = fs.getFileStatus(pathUnderTest);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
- assertEquals(ret.getOwner(), currentUser.getShortUserName());
+ assertTrue(ret.getOwner().equals(currentUser.getShortUserName()));
fs.delete(pathUnderTest, true);
}
@@ -177,18 +191,29 @@ public abstract class NativeAzureFileSystemBaseTest {
fs.delete(testFolder, true);
}
- @Test
- public void testDeepFileCreation() throws Exception {
- Path testFile = new Path("deep/file/creation/test");
- FsPermission permission = FsPermission.createImmutable((short) 644);
+ void testDeepFileCreationBase(String testFilePath, String firstDirPath, String middleDirPath,
+ short permissionShort, short umaskedPermissionShort) throws Exception {
+ Path testFile = new Path(testFilePath);
+ Path firstDir = new Path(firstDirPath);
+ Path middleDir = new Path(middleDirPath);
+ FsPermission permission = FsPermission.createImmutable(permissionShort);
+ FsPermission umaskedPermission = FsPermission.createImmutable(umaskedPermissionShort);
+
createEmptyFile(testFile, permission);
+ FsPermission rootPerm = fs.getFileStatus(firstDir.getParent()).getPermission();
+ FsPermission inheritPerm = FsPermission.createImmutable((short)(rootPerm.toShort() | 0300));
assertTrue(fs.exists(testFile));
- assertTrue(fs.exists(new Path("deep")));
- assertTrue(fs.exists(new Path("deep/file/creation")));
- FileStatus ret = fs.getFileStatus(new Path("deep/file"));
- assertTrue(ret.isDirectory());
- assertEqualsIgnoreStickyBit(permission, ret.getPermission());
- assertTrue(fs.delete(new Path("deep"), true));
+ assertTrue(fs.exists(firstDir));
+ assertTrue(fs.exists(middleDir));
+ // verify that the indirectly created directory inherited its permissions from the root directory
+ FileStatus directoryStatus = fs.getFileStatus(middleDir);
+ assertTrue(directoryStatus.isDirectory());
+ assertEqualsIgnoreStickyBit(inheritPerm, directoryStatus.getPermission());
+ // verify that the file itself has the permissions as specified
+ FileStatus fileStatus = fs.getFileStatus(testFile);
+ assertFalse(fileStatus.isDirectory());
+ assertEqualsIgnoreStickyBit(umaskedPermission, fileStatus.getPermission());
+ assertTrue(fs.delete(firstDir, true));
assertFalse(fs.exists(testFile));
// An alternative test scenario would've been to delete the file first,
@@ -196,6 +221,22 @@ public abstract class NativeAzureFileSystemBaseTest {
// doesn't actually work as expected right now.
}
+ @Test
+ public void testDeepFileCreation() throws Exception {
+ // normal permissions in user home
+ testDeepFileCreationBase("deep/file/creation/test", "deep", "deep/file/creation", (short)0644, (short)0644);
+ // extra permissions in user home. umask will change the actual permissions.
+ testDeepFileCreationBase("deep/file/creation/test", "deep", "deep/file/creation", (short)0777, (short)0755);
+ // normal permissions in root
+ testDeepFileCreationBase("/deep/file/creation/test", "/deep", "/deep/file/creation", (short)0644, (short)0644);
+ // less permissions in root
+ testDeepFileCreationBase("/deep/file/creation/test", "/deep", "/deep/file/creation", (short)0700, (short)0700);
+ // one indirectly created directory in root
+ testDeepFileCreationBase("/deep/file", "/deep", "/deep", (short)0644, (short)0644);
+ // one indirectly created directory in user home
+ testDeepFileCreationBase("deep/file", "deep", "deep", (short)0644, (short)0644);
+ }
+
private static enum RenameVariation {
NormalFileName, SourceInAFolder, SourceWithSpace, SourceWithPlusAndPercent
}
@@ -206,20 +247,20 @@ public abstract class NativeAzureFileSystemBaseTest {
System.out.printf("Rename variation: %s\n", variation);
Path originalFile;
switch (variation) {
- case NormalFileName:
- originalFile = new Path("fileToRename");
- break;
- case SourceInAFolder:
- originalFile = new Path("file/to/rename");
- break;
- case SourceWithSpace:
- originalFile = new Path("file to rename");
- break;
- case SourceWithPlusAndPercent:
- originalFile = new Path("file+to%rename");
- break;
- default:
- throw new Exception("Unknown variation");
+ case NormalFileName:
+ originalFile = new Path("fileToRename");
+ break;
+ case SourceInAFolder:
+ originalFile = new Path("file/to/rename");
+ break;
+ case SourceWithSpace:
+ originalFile = new Path("file to rename");
+ break;
+ case SourceWithPlusAndPercent:
+ originalFile = new Path("file+to%rename");
+ break;
+ default:
+ throw new Exception("Unknown variation");
}
Path destinationFile = new Path("file/resting/destination");
assertTrue(fs.createNewFile(originalFile));
@@ -227,7 +268,8 @@ public abstract class NativeAzureFileSystemBaseTest {
assertFalse(fs.rename(originalFile, destinationFile)); // Parent directory
// doesn't exist
assertTrue(fs.mkdirs(destinationFile.getParent()));
- assertTrue(fs.rename(originalFile, destinationFile));
+ boolean result = fs.rename(originalFile, destinationFile);
+ assertTrue(result);
assertTrue(fs.exists(destinationFile));
assertFalse(fs.exists(originalFile));
fs.delete(destinationFile.getParent(), true);
@@ -239,10 +281,10 @@ public abstract class NativeAzureFileSystemBaseTest {
Path testFile = new Path("deep/file/rename/test");
FsPermission permission = FsPermission.createImmutable((short) 644);
createEmptyFile(testFile, permission);
- assertTrue(fs.rename(new Path("deep/file"), new Path("deep/renamed")));
+ boolean renameResult = fs.rename(new Path("deep/file"), new Path("deep/renamed"));
+ assertTrue(renameResult);
assertFalse(fs.exists(testFile));
- FileStatus newStatus = fs
- .getFileStatus(new Path("deep/renamed/rename/test"));
+ FileStatus newStatus = fs.getFileStatus(new Path("deep/renamed/rename/test"));
assertNotNull(newStatus);
assertEqualsIgnoreStickyBit(permission, newStatus.getPermission());
assertTrue(fs.delete(new Path("deep"), true));
@@ -256,21 +298,25 @@ public abstract class NativeAzureFileSystemBaseTest {
public void testRenameFolder() throws Exception {
for (RenameFolderVariation variation : RenameFolderVariation.values()) {
Path originalFolder = new Path("folderToRename");
- if (variation != RenameFolderVariation.CreateJustInnerFile){
+ if (variation != RenameFolderVariation.CreateJustInnerFile) {
assertTrue(fs.mkdirs(originalFolder));
}
Path innerFile = new Path(originalFolder, "innerFile");
- if (variation != RenameFolderVariation.CreateJustFolder){
+ Path innerFile2 = new Path(originalFolder, "innerFile2");
+ if (variation != RenameFolderVariation.CreateJustFolder) {
assertTrue(fs.createNewFile(innerFile));
+ assertTrue(fs.createNewFile(innerFile2));
}
Path destination = new Path("renamedFolder");
assertTrue(fs.rename(originalFolder, destination));
assertTrue(fs.exists(destination));
- if (variation != RenameFolderVariation.CreateJustFolder){
+ if (variation != RenameFolderVariation.CreateJustFolder) {
assertTrue(fs.exists(new Path(destination, innerFile.getName())));
+ assertTrue(fs.exists(new Path(destination, innerFile2.getName())));
}
assertFalse(fs.exists(originalFolder));
assertFalse(fs.exists(innerFile));
+ assertFalse(fs.exists(innerFile2));
fs.delete(destination, true);
}
}
@@ -365,6 +411,43 @@ public abstract class NativeAzureFileSystemBaseTest {
}
@Test
+ public void testChineseCharacters() throws Exception {
+ // Create a file and a folder with Chinese (non-ASCI) characters
+ String chinese = "" + '\u963f' + '\u4db5';
+ String fileName = "filename" + chinese;
+ String directoryName = chinese;
+ fs.create(new Path(directoryName, fileName)).close();
+ FileStatus[] listing = fs.listStatus(new Path(directoryName));
+ assertEquals(1, listing.length);
+ assertEquals(fileName, listing[0].getPath().getName());
+ FileStatus status = fs.getFileStatus(new Path(directoryName, fileName));
+ assertEquals(fileName, status.getPath().getName());
+ InputStream stream = fs.open(new Path(directoryName, fileName));
+ assertNotNull(stream);
+ stream.close();
+ assertTrue(fs.delete(new Path(directoryName, fileName), true));
+ assertTrue(fs.delete(new Path(directoryName), true));
+ }
+
+ @Test
+ public void testChineseCharactersFolderRename() throws Exception {
+ // Create a file and a folder with Chinese (non-ASCI) characters
+ String chinese = "" + '\u963f' + '\u4db5';
+ String fileName = "filename" + chinese;
+ String srcDirectoryName = chinese;
+ String targetDirectoryName = "target" + chinese;
+ fs.create(new Path(srcDirectoryName, fileName)).close();
+ fs.rename(new Path(srcDirectoryName), new Path(targetDirectoryName));
+ FileStatus[] listing = fs.listStatus(new Path(targetDirectoryName));
+ assertEquals(1, listing.length);
+ assertEquals(fileName, listing[0].getPath().getName());
+ FileStatus status = fs.getFileStatus(new Path(targetDirectoryName, fileName));
+ assertEquals(fileName, status.getPath().getName());
+ assertTrue(fs.delete(new Path(targetDirectoryName, fileName), true));
+ assertTrue(fs.delete(new Path(targetDirectoryName), true));
+ }
+
+ @Test
public void testReadingDirectoryAsFile() throws Exception {
Path dir = new Path("/x");
assertTrue(fs.mkdirs(dir));
@@ -403,7 +486,12 @@ public abstract class NativeAzureFileSystemBaseTest {
assertEquals("supergroup", newStatus.getGroup());
assertEquals(UserGroupInformation.getCurrentUser().getShortUserName(),
newStatus.getOwner());
- assertEquals(1, newStatus.getLen());
+
+ // Don't check the file length for page blobs. Only block blobs
+ // provide the actual length of bytes written.
+ if (!(this instanceof TestNativeAzureFSPageBlobLive)) {
+ assertEquals(1, newStatus.getLen());
+ }
}
@Test
@@ -429,7 +517,12 @@ public abstract class NativeAzureFileSystemBaseTest {
assertNotNull(newStatus);
assertEquals("newUser", newStatus.getOwner());
assertEquals("supergroup", newStatus.getGroup());
- assertEquals(1, newStatus.getLen());
+
+ // File length is only reported to be the size of bytes written to the file for block blobs.
+ // So only check it for block blobs, not page blobs.
+ if (!(this instanceof TestNativeAzureFSPageBlobLive)) {
+ assertEquals(1, newStatus.getLen());
+ }
fs.setOwner(newFile, null, "newGroup");
newStatus = fs.getFileStatus(newFile);
assertNotNull(newStatus);
@@ -506,14 +599,570 @@ public abstract class NativeAzureFileSystemBaseTest {
testModifiedTime(destFolder);
}
+ /**
+ * Verify we can get file status of a directory with various forms of
+ * the directory file name, including the nonstandard but legal form
+ * ending in "/.". Check that we're getting status for a directory.
+ */
@Test
public void testListSlash() throws Exception {
Path testFolder = new Path("/testFolder");
Path testFile = new Path(testFolder, "testFile");
assertTrue(fs.mkdirs(testFolder));
assertTrue(fs.createNewFile(testFile));
- FileStatus status = fs.getFileStatus(new Path("/testFolder/."));
- assertNotNull(status);
+ FileStatus status;
+ status = fs.getFileStatus(new Path("/testFolder"));
+ assertTrue(status.isDirectory());
+ status = fs.getFileStatus(new Path("/testFolder/"));
+ assertTrue(status.isDirectory());
+ status = fs.getFileStatus(new Path("/testFolder/."));
+ assertTrue(status.isDirectory());
+ }
+
+ @Test
+ public void testCannotCreatePageBlobByDefault() throws Exception {
+
+ // Verify that the page blob directory list configuration setting
+ // is not set in the default configuration.
+ Configuration conf = new Configuration();
+ String[] rawPageBlobDirs =
+ conf.getStrings(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES);
+ assertTrue(rawPageBlobDirs == null);
+ }
+
+ /*
+ * Set up a situation where a folder rename is partway finished.
+ * Then apply redo to finish the rename.
+ *
+ * The original source folder *would* have had contents
+ * folderToRename (0 byte dummy file for directory)
+ * folderToRename/innerFile
+ * folderToRename/innerFile2
+ *
+ * The actual source folder (after partial rename and failure)
+ *
+ * folderToRename
+ * folderToRename/innerFile2
+ *
+ * The actual target folder (after partial rename and failure)
+ *
+ * renamedFolder
+ * renamedFolder/innerFile
+ */
+ @Test
+ public void testRedoRenameFolder() throws IOException {
+ // create original folder
+ String srcKey = "folderToRename";
+ Path originalFolder = new Path(srcKey);
+ assertTrue(fs.mkdirs(originalFolder));
+ Path innerFile = new Path(originalFolder, "innerFile");
+ assertTrue(fs.createNewFile(innerFile));
+ Path innerFile2 = new Path(originalFolder, "innerFile2");
+ assertTrue(fs.createNewFile(innerFile2));
+
+ String dstKey = "renamedFolder";
+
+ // propose (but don't do) the rename
+ Path home = fs.getHomeDirectory();
+ String relativeHomeDir = getRelativePath(home.toString());
+ NativeAzureFileSystem.FolderRenamePending pending =
+ new NativeAzureFileSystem.FolderRenamePending(
+ relativeHomeDir + "/" + srcKey,
+ relativeHomeDir + "/" + dstKey, null,
+ (NativeAzureFileSystem) fs);
+
+ // get the rename pending file contents
+ String renameDescription = pending.makeRenamePendingFileContents();
+
+ // Remove one file from source folder to simulate a partially done
+ // rename operation.
+ assertTrue(fs.delete(innerFile, false));
+
+ // Create the destination folder with just one file in it, again
+ // to simulate a partially done rename.
+ Path destination = new Path(dstKey);
+ Path innerDest = new Path(destination, "innerFile");
+ assertTrue(fs.createNewFile(innerDest));
+
+ // Create a rename-pending file and write rename information to it.
+ final String renamePendingStr = "folderToRename-RenamePending.json";
+ Path renamePendingFile = new Path(renamePendingStr);
+ FSDataOutputStream out = fs.create(renamePendingFile, true);
+ assertTrue(out != null);
+ writeString(out, renameDescription);
+
+ // Redo the rename operation based on the contents of the -RenamePending.json file.
+ // Trigger the redo by checking for existence of the original folder. It must appear
+ // to not exist.
+ assertFalse(fs.exists(originalFolder));
+
+ // Verify that the target is there, and the source is gone.
+ assertTrue(fs.exists(destination));
+ assertTrue(fs.exists(new Path(destination, innerFile.getName())));
+ assertTrue(fs.exists(new Path(destination, innerFile2.getName())));
+ assertFalse(fs.exists(originalFolder));
+ assertFalse(fs.exists(innerFile));
+ assertFalse(fs.exists(innerFile2));
+
+ // Verify that there's no RenamePending file left.
+ assertFalse(fs.exists(renamePendingFile));
+
+ // Verify that we can list the target directory.
+ FileStatus[] listed = fs.listStatus(destination);
+ assertEquals(2, listed.length);
+
+ // List the home directory and show the contents is a directory.
+ Path root = fs.getHomeDirectory();
+ listed = fs.listStatus(root);
+ assertEquals(1, listed.length);
+ assertTrue(listed[0].isDirectory());
+ }
+
+ /**
+ * If there is a folder to be renamed inside a parent folder,
+ * then when you list the parent folder, you should only see
+ * the final result, after the rename.
+ */
+ @Test
+ public void testRedoRenameFolderInFolderListing() throws IOException {
+
+ // create original folder
+ String parent = "parent";
+ Path parentFolder = new Path(parent);
+ assertTrue(fs.mkdirs(parentFolder));
+ Path inner = new Path(parentFolder, "innerFolder");
+ assertTrue(fs.mkdirs(inner));
+ Path inner2 = new Path(parentFolder, "innerFolder2");
+ assertTrue(fs.mkdirs(inner2));
+ Path innerFile = new Path(inner2, "file");
+ assertTrue(fs.createNewFile(innerFile));
+
+ Path inner2renamed = new Path(parentFolder, "innerFolder2Renamed");
+
+ // propose (but don't do) the rename of innerFolder2
+ Path home = fs.getHomeDirectory();
+ String relativeHomeDir = getRelativePath(home.toString());
+ NativeAzureFileSystem.FolderRenamePending pending =
+ new NativeAzureFileSystem.FolderRenamePending(
+ relativeHomeDir + "/" + inner2,
+ relativeHomeDir + "/" + inner2renamed, null,
+ (NativeAzureFileSystem) fs);
+
+ // Create a rename-pending file and write rename information to it.
+ final String renamePendingStr = inner2 + FolderRenamePending.SUFFIX;
+ Path renamePendingFile = new Path(renamePendingStr);
+ FSDataOutputStream out = fs.create(renamePendingFile, true);
+ assertTrue(out != null);
+ writeString(out, pending.makeRenamePendingFileContents());
+
+ // Redo the rename operation based on the contents of the
+ // -RenamePending.json file. Trigger the redo by checking for existence of
+ // the original folder. It must appear to not exist.
+ FileStatus[] listed = fs.listStatus(parentFolder);
+ assertEquals(2, listed.length);
+ assertTrue(listed[0].isDirectory());
+ assertTrue(listed[1].isDirectory());
+
+ // The rename pending file is not a directory, so at this point we know the
+ // redo has been done.
+ assertFalse(fs.exists(inner2)); // verify original folder is gone
+ assertTrue(fs.exists(inner2renamed)); // verify the target is there
+ assertTrue(fs.exists(new Path(inner2renamed, "file")));
+ }
+
+ /**
+ * Test the situation where a rename pending file exists but the rename
+ * is really done. This could happen if the rename process died just
+ * before deleting the rename pending file. It exercises a non-standard
+ * code path in redo().
+ */
+ @Test
+ public void testRenameRedoFolderAlreadyDone() throws IOException {
+ // create only destination folder
+ String orig = "originalFolder";
+ String dest = "renamedFolder";
+ Path destPath = new Path(dest);
+ assertTrue(fs.mkdirs(destPath));
+
+ // propose (but don't do) the rename of innerFolder2
+ Path home = fs.getHomeDirectory();
+ String relativeHomeDir = getRelativePath(home.toString());
+ NativeAzureFileSystem.FolderRenamePending pending =
+ new NativeAzureFileSystem.FolderRenamePending(
+ relativeHomeDir + "/" + orig,
+ relativeHomeDir + "/" + dest, null,
+ (NativeAzureFileSystem) fs);
+
+ // Create a rename-pending file and write rename information to it.
+ final String renamePendingStr = orig + FolderRenamePending.SUFFIX;
+ Path renamePendingFile = new Path(renamePendingStr);
+ FSDataOutputStream out = fs.create(renamePendingFile, true);
+ assertTrue(out != null);
+ writeString(out, pending.makeRenamePendingFileContents());
+
+ try {
+ pending.redo();
+ } catch (Exception e) {
+ fail();
+ }
+
+ // Make sure rename pending file is gone.
+ FileStatus[] listed = fs.listStatus(new Path("/"));
+ assertEquals(1, listed.length);
+ assertTrue(listed[0].isDirectory());
+ }
+
+ @Test
+ public void testRedoFolderRenameAll() throws IllegalArgumentException, IOException {
+ {
+ FileFolder original = new FileFolder("folderToRename");
+ original.add("innerFile").add("innerFile2");
+ FileFolder partialSrc = original.copy();
+ FileFolder partialDst = original.copy();
+ partialDst.setName("renamedFolder");
+ partialSrc.setPresent(0, false);
+ partialDst.setPresent(1, false);
+
+ testRenameRedoFolderSituation(original, partialSrc, partialDst);
+ }
+ {
+ FileFolder original = new FileFolder("folderToRename");
+ original.add("file1").add("file2").add("file3");
+ FileFolder partialSrc = original.copy();
+ FileFolder partialDst = original.copy();
+ partialDst.setName("renamedFolder");
+
+ // Set up this state before the redo:
+ // folderToRename: file1 file3
+ // renamedFolder: file1 file2
+ // This gives code coverage for all 3 expected cases for individual file
+ // redo.
+ partialSrc.setPresent(1, false);
+ partialDst.setPresent(2, false);
+
+ testRenameRedoFolderSituation(original, partialSrc, partialDst);
+ }
+ {
+ // Simulate a situation with folder with a large number of files in it.
+ // For the first half of the files, they will be in the destination
+ // but not the source. For the second half, they will be in the source
+ // but not the destination. There will be one file in the middle that is
+ // in both source and destination. Then trigger redo and verify.
+ // For testing larger folder sizes, manually change this, temporarily, and
+ // edit the SIZE value.
+ final int SIZE = 5;
+ assertTrue(SIZE >= 3);
+ // Try a lot of files in the folder.
+ FileFolder original = new FileFolder("folderToRename");
+ for (int i = 0; i < SIZE; i++) {
+ original.add("file" + Integer.toString(i));
+ }
+ FileFolder partialSrc = original.copy();
+ FileFolder partialDst = original.copy();
+ partialDst.setName("renamedFolder");
+ for (int i = 0; i < SIZE; i++) {
+ partialSrc.setPresent(i, i >= SIZE / 2);
+ partialDst.setPresent(i, i <= SIZE / 2);
+ }
+
+ testRenameRedoFolderSituation(original, partialSrc, partialDst);
+ }
+ {
+ // Do a nested folder, like so:
+ // folderToRename:
+ // nestedFolder: a, b, c
+ // p
+ // q
+ //
+ // Then delete file 'a' from the source and add it to destination.
+ // Then trigger redo.
+
+ FileFolder original = new FileFolder("folderToRename");
+ FileFolder nested = new FileFolder("nestedFolder");
+ nested.add("a").add("b").add("c");
+ original.add(nested).add("p").add("q");
+
+ FileFolder partialSrc = original.copy();
+ FileFolder partialDst = original.copy();
+ partialDst.setName("renamedFolder");
+
+ // logically remove 'a' from source
+ partialSrc.getMember(0).setPresent(0, false);
+
+ // logically eliminate b, c from destination
+ partialDst.getMember(0).setPresent(1, false);
+ partialDst.getMember(0).setPresent(2, false);
+
+ testRenameRedoFolderSituation(original, partialSrc, partialDst);
+ }
+ }
+
+ private void testRenameRedoFolderSituation(
+ FileFolder fullSrc,
+ FileFolder partialSrc,
+ FileFolder partialDst) throws IllegalArgumentException, IOException {
+
+ // make file folder tree for source
+ fullSrc.create();
+
+ // set up rename pending file
+ fullSrc.makeRenamePending(partialDst);
+
+ // prune away some files (as marked) from source to simulate partial rename
+ partialSrc.prune();
+
+ // Create only the files indicated for the destination to indicate a partial rename.
+ partialDst.create();
+
+ // trigger redo
+ assertFalse(fullSrc.exists());
+
+ // verify correct results
+ partialDst.verifyExists();
+ fullSrc.verifyGone();
+
+ // delete the new folder to leave no garbage behind
+ fs.delete(new Path(partialDst.getName()), true);
+ }
+
+ // Mock up of a generalized folder (which can also be a leaf-level file)
+ // for rename redo testing.
+ private class FileFolder {
+ private String name;
+
+ // For rename testing, indicates whether an expected
+ // file is present in the source or target folder.
+ private boolean present;
+ ArrayList<FileFolder> members; // Null if a leaf file, otherwise not null.
+
+ // Make a new, empty folder (not a regular leaf file).
+ public FileFolder(String name) {
+ this.name = name;
+ this.present = true;
+ members = new ArrayList<FileFolder>();
+ }
+
+ public FileFolder getMember(int i) {
+ return members.get(i);
+ }
+
+ // Verify a folder and all its contents are gone. This is only to
+ // be called on the root of a FileFolder.
+ public void verifyGone() throws IllegalArgumentException, IOException {
+ assertFalse(fs.exists(new Path(name)));
+ assertTrue(isFolder());
+ verifyGone(new Path(name), members);
+ }
+
+ private void verifyGone(Path prefix, ArrayList<FileFolder> members2) throws IOException {
+ for (FileFolder f : members2) {
+ f.verifyGone(prefix);
+ }
+ }
+
+ private void verifyGone(Path prefix) throws IOException {
+ assertFalse(fs.exists(new Path(prefix, name)));
+ if (isLeaf()) {
+ return;
+ }
+ for (FileFolder f : members) {
+ f.verifyGone(new Path(prefix, name));
+ }
+ }
+
+ public void verifyExists() throws IllegalArgumentException, IOException {
+
+ // verify the root is present
+ assertTrue(fs.exists(new Path(name)));
+ assertTrue(isFolder());
+
+ // check the members
+ verifyExists(new Path(name), members);
+ }
+
+ private void verifyExists(Path prefix, ArrayList<FileFolder> members2) throws IOException {
+ for (FileFolder f : members2) {
+ f.verifyExists(prefix);
+ }
+ }
+
+ private void verifyExists(Path prefix) throws IOException {
+
+ // verify this file/folder is present
+ assertTrue(fs.exists(new Path(prefix, name)));
+
+ // verify members are present
+ if (isLeaf()) {
+ return;
+ }
+
+ for (FileFolder f : members) {
+ f.verifyExists(new Path(prefix, name));
+ }
+ }
+
+ public boolean exists() throws IOException {
+ return fs.exists(new Path(name));
+ }
+
+ // Make a rename pending file for the situation where we rename
+ // this object (the source) to the specified destination.
+ public void makeRenamePending(FileFolder dst) throws IOException {
+
+ // Propose (but don't do) the rename.
+ Path home = fs.getHomeDirectory();
+ String relativeHomeDir = getRelativePath(home.toString());
+ NativeAzureFileSystem.FolderRenamePending pending =
+ new NativeAzureFileSystem.FolderRenamePending(
+ relativeHomeDir + "/" + this.getName(),
+ relativeHomeDir + "/" + dst.getName(), null,
+ (NativeAzureFileSystem) fs);
+
+ // Get the rename pending file contents.
+ String renameDescription = pending.makeRenamePendingFileContents();
+
+ // Create a rename-pending file and write rename information to it.
+ final String renamePendingStr = this.getName() + "-RenamePending.json";
+ Path renamePendingFile = new Path(renamePendingStr);
+ FSDataOutputStream out = fs.create(renamePendingFile, true);
+ assertTrue(out != null);
+ writeString(out, renameDescription);
+ }
+
+ // set whether a child is present or not
+ public void setPresent(int i, boolean b) {
+ members.get(i).setPresent(b);
+ }
+
+ // Make an uninitialized folder
+ private FileFolder() {
+ this.present = true;
+ }
+
+ public void setPresent(boolean value) {
+ present = value;
+ }
+
+ public FileFolder makeLeaf(String name) {
+ FileFolder f = new FileFolder();
+ f.setName(name);
+ return f;
+ }
+
+ void setName(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isLeaf() {
+ return members == null;
+ }
+
+ public boolean isFolder() {
+ return members != null;
+ }
+
+ FileFolder add(FileFolder folder) {
+ members.add(folder);
+ return this;
+ }
+
+ // Add a leaf file (by convention, if you pass a string argument, you get a leaf).
+ FileFolder add(String file) {
+ FileFolder leaf = makeLeaf(file);
+ members.add(leaf);
+ return this;
+ }
+
+ public FileFolder copy() {
+ if (isLeaf()) {
+ return makeLeaf(name);
+ } else {
+ FileFolder f = new FileFolder(name);
+ for (FileFolder member : members) {
+ f.add(member.copy());
+ }
+ return f;
+ }
+ }
+
+ // Create the folder structure. Return true on success, or else false.
+ public void create() throws IllegalArgumentException, IOException {
+ create(null);
+ }
+
+ private void create(Path prefix) throws IllegalArgumentException, IOException {
+ if (isFolder()) {
+ if (present) {
+ assertTrue(fs.mkdirs(makePath(prefix, name)));
+ }
+ create(makePath(prefix, name), members);
+ } else if (isLeaf()) {
+ if (present) {
+ assertTrue(fs.createNewFile(makePath(prefix, name)));
+ }
+ } else {
+ assertTrue("The object must be a (leaf) file or a folder.", false);
+ }
+ }
+
+ private void create(Path prefix, ArrayList<FileFolder> members2) throws IllegalArgumentException, IOException {
+ for (FileFolder f : members2) {
+ f.create(prefix);
+ }
+ }
+
+ private Path makePath(Path prefix, String name) {
+ if (prefix == null) {
+ return new Path(name);
+ } else {
+ return new Path(prefix, name);
+ }
+ }
+
+ // Remove the files marked as not present.
+ public void prune() throws IOException {
+ prune(null);
+ }
+
+ private void prune(Path prefix) throws IOException {
+ Path path = null;
+ if (prefix == null) {
+ path = new Path(name);
+ } else {
+ path = new Path(prefix, name);
+ }
+ if (isLeaf() && !present) {
+ assertTrue(fs.delete(path, false));
+ } else if (isFolder() && !present) {
+ assertTrue(fs.delete(path, true));
+ } else if (isFolder()) {
+ for (FileFolder f : members) {
+ f.prune(path);
+ }
+ }
+ }
+ }
+
+ private String getRelativePath(String path) {
+ // example input: wasb://wasbtests-ehans-1404322046279@ehans9.blob.core.windows.net/user/ehans/folderToRename
+ // example result: user/ehans/folderToRename
+
+ // Find the third / position and return input substring after that.
+ int slashCount = 0; // number of slashes so far
+ int i;
+ for (i = 0; i < path.length(); i++) {
+ if (path.charAt(i) == '/') {
+ slashCount++;
+ if (slashCount == 3) {
+ return path.substring(i + 1, path.length());
+ }
+ }
+ }
+ throw new RuntimeException("Incorrect path prefix -- expected wasb://.../...");
}
@Test
@@ -523,6 +1172,84 @@ public abstract class NativeAzureFileSystemBaseTest {
fs.close();
}
+ // Test the available() method for the input stream returned by fs.open().
+ // This works for both page and block blobs.
+ int FILE_SIZE = 4 * 1024 * 1024 + 1; // Make this 1 bigger than internal
+ // buffer used in BlobInputStream
+ // to exercise that case.
+ int MAX_STRIDE = FILE_SIZE + 1;
+ Path PATH = new Path("/available.dat");
+ @Test
+ public void testAvailable() throws IOException {
+
+ // write FILE_SIZE bytes to page blob
+ FSDataOutputStream out = fs.create(PATH);
+ byte[] data = new byte[FILE_SIZE];
+ Arrays.fill(data, (byte) 5);
+ out.write(data, 0, FILE_SIZE);
+ out.close();
+
+ // Test available() for different read sizes
+ verifyAvailable(1);
+ verifyAvailable(100);
+ verifyAvailable(5000);
+ verifyAvailable(FILE_SIZE);
+ verifyAvailable(MAX_STRIDE);
+
+ fs.delete(PATH, false);
+ }
+
+ // Verify that available() for the input stream is always >= 1 unless we've
+ // consumed all the input, and then it is 0. This is to match expectations by
+ // HBase which were set based on behavior of DFSInputStream.available().
+ private void verifyAvailable(int readStride) throws IOException {
+ FSDataInputStream in = fs.open(PATH);
+ try {
+ byte[] inputBuffer = new byte[MAX_STRIDE];
+ int position = 0;
+ int bytesRead = 0;
+ while(bytesRead != FILE_SIZE) {
+ bytesRead += in.read(inputBuffer, position, readStride);
+ int available = in.available();
+ if (bytesRead < FILE_SIZE) {
+ if (available < 1) {
+ fail(String.format(
+ "expected available > 0 but got: "
+ + "position = %d, bytesRead = %d, in.available() = %d",
+ position, bytesRead, available));
+ }
+ }
+ }
+ int available = in.available();
+ assertTrue(available == 0);
+ } finally {
+ in.close();
+ }
+ }
+
+ @Test
+ public void testGetFileSizeFromListing() throws IOException {
+ Path path = new Path("file.dat");
+ final int PAGE_SIZE = 512;
+ final int FILE_SIZE = PAGE_SIZE + 1;
+
+ // write FILE_SIZE bytes to page blob
+ FSDataOutputStream out = fs.create(path);
+ byte[] data = new byte[FILE_SIZE];
+ Arrays.fill(data, (byte) 5);
+ out.write(data, 0, FILE_SIZE);
+ out.close();
+
+ // list the file to get its properties
+ FileStatus[] status = fs.listStatus(path);
+ assertEquals(1, status.length);
+
+ // The file length should report the number of bytes
+ // written for either page or block blobs (subclasses
+ // of this test class will exercise both).
+ assertEquals(FILE_SIZE, status[0].getLen());
+ }
+
private boolean testModifiedTime(Path testPath, long time) throws Exception {
FileStatus fileStatus = fs.getFileStatus(testPath);
final long errorMargin = modifiedTimeErrorMargin;
@@ -530,16 +1257,45 @@ public abstract class NativeAzureFileSystemBaseTest {
return (lastModified > (time - errorMargin) && lastModified < (time + errorMargin));
}
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testCreateNonRecursive() throws Exception {
+ Path testFolder = new Path("/testFolder");
+ Path testFile = new Path(testFolder, "testFile");
+ try {
+ fs.createNonRecursive(testFile, true, 1024, (short)1, 1024, null);
+ assertTrue("Should've thrown", false);
+ } catch (FileNotFoundException e) {
+ }
+ fs.mkdirs(testFolder);
+ fs.createNonRecursive(testFile, true, 1024, (short)1, 1024, null)
+ .close();
+ assertTrue(fs.exists(testFile));
+ }
+
+ public void testFileEndingInDot() throws Exception {
+ Path testFolder = new Path("/testFolder.");
+ Path testFile = new Path(testFolder, "testFile.");
+ assertTrue(fs.mkdirs(testFolder));
+ assertTrue(fs.createNewFile(testFile));
+ assertTrue(fs.exists(testFile));
+ FileStatus[] listed = fs.listStatus(testFolder);
+ assertEquals(1, listed.length);
+ assertEquals("testFile.", listed[0].getPath().getName());
+ }
private void testModifiedTime(Path testPath) throws Exception {
Calendar utc = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
long currentUtcTime = utc.getTime().getTime();
FileStatus fileStatus = fs.getFileStatus(testPath);
- assertTrue("Modification time "
- + new Date(fileStatus.getModificationTime()) + " is not close to now: "
- + utc.getTime(), testModifiedTime(testPath, currentUtcTime));
+ final long errorMargin = 10 * 1000; // Give it +/-10 seconds
+ assertTrue("Modification time " +
+ new Date(fileStatus.getModificationTime()) + " is not close to now: " +
+ utc.getTime(),
+ fileStatus.getModificationTime() > (currentUtcTime - errorMargin) &&
+ fileStatus.getModificationTime() < (currentUtcTime + errorMargin));
}
- private void createEmptyFile(Path testFile, FsPermission permission)
+ private void createEmptyFile(Path testFile, FsPermission permission)
throws IOException {
FSDataOutputStream outputStream = fs.create(testFile, permission, true,
4096, (short) 1, 1024, null);
@@ -563,7 +1319,7 @@ public abstract class NativeAzureFileSystemBaseTest {
final int BUFFER_SIZE = 1024;
char[] buffer = new char[BUFFER_SIZE];
int count = reader.read(buffer, 0, BUFFER_SIZE);
- if (count >= BUFFER_SIZE) {
+ if (count > BUFFER_SIZE) {
throw new IOException("Exceeded buffer size");
}
inputStream.close();
@@ -578,7 +1334,6 @@ public abstract class NativeAzureFileSystemBaseTest {
throws IOException {
FSDataOutputStream outputStream = fs.create(path, true);
writeString(outputStream, value);
- outputStream.close();
}
private void writeString(FSDataOutputStream outputStream, String value)
@@ -588,4 +1343,175 @@ public abstract class NativeAzureFileSystemBaseTest {
writer.write(value);
writer.close();
}
+
+ @Test
+ // Acquire and free a Lease object. Wait for more than the lease
+ // timeout, to make sure the lease renews itself.
+ public void testSelfRenewingLease() throws IllegalArgumentException, IOException,
+ InterruptedException, StorageException {
+
+ SelfRenewingLease lease;
+ final String FILE_KEY = "file";
+ fs.create(new Path(FILE_KEY));
+ NativeAzureFileSystem nfs = (NativeAzureFileSystem) fs;
+ String fullKey = nfs.pathToKey(nfs.makeAbsolute(new Path(FILE_KEY)));
+ AzureNativeFileSystemStore store = nfs.getStore();
+ lease = store.acquireLease(fullKey);
+ assertTrue(lease.getLeaseID() != null);
+
+ // The sleep time for the keep-alive thread is 40 seconds, so sleep just
+ // a little beyond that, to make sure the keep-alive thread wakes up
+ // and renews the lease.
+ Thread.sleep(42000);
+ lease.free();
+
+ // Check that the lease is really freed.
+ CloudBlob blob = lease.getCloudBlob();
+
+ // Try to acquire it again, using direct Azure blob access.
+ // If that succeeds, then the lease was already freed.
+ String differentLeaseID = null;
+ try {
+ differentLeaseID = blob.acquireLease(15, null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Caught exception trying to directly re-acquire lease from Azure");
+ } finally {
+ assertTrue(differentLeaseID != null);
+ AccessCondition accessCondition = AccessCondition.generateEmptyCondition();
+ accessCondition.setLeaseID(differentLeaseID);
+ blob.releaseLease(accessCondition);
+ }
+ }
+
+ @Test
+ // Acquire a SelfRenewingLease object. Wait for more than the lease
+ // timeout, to make sure the lease renews itself. Delete the file.
+ // That will automatically free the lease.
+ // (that should work without any failures).
+ public void testSelfRenewingLeaseFileDelete()
+ throws IllegalArgumentException, IOException,
+ InterruptedException, StorageException {
+
+ SelfRenewingLease lease;
+ final String FILE_KEY = "file";
+ final Path path = new Path(FILE_KEY);
+ fs.create(path);
+ NativeAzureFileSystem nfs = (NativeAzureFileSystem) fs;
+ String fullKey = nfs.pathToKey(nfs.makeAbsolute(path));
+ lease = nfs.getStore().acquireLease(fullKey);
+ assertTrue(lease.getLeaseID() != null);
+
+ // The sleep time for the keep-alive thread is 40 seconds, so sleep just
+ // a little beyond that, to make sure the keep-alive thread wakes up
+ // and renews the lease.
+ Thread.sleep(42000);
+
+ nfs.getStore().delete(fullKey, lease);
+
+ // Check that the file is really gone and the lease is freed.
+ assertTrue(!fs.exists(path));
+ assertTrue(lease.isFreed());
+ }
+
+ // Variables to check assertions in next test.
+ private long firstEndTime;
+ private long secondStartTime;
+
+ // Create two threads. One will get a lease on a file.
+ // The second one will try to get the lease and thus block.
+ // Then the first one will free the lease and the second
+ // one will get it and proceed.
+ @Test
+ public void testLeaseAsDistributedLock() throws IllegalArgumentException,
+ IOException {
+ final String LEASE_LOCK_FILE_KEY = "file";
+ fs.create(new Path(LEASE_LOCK_FILE_KEY));
+ NativeAzureFileSystem nfs = (NativeAzureFileSystem) fs;
+ String fullKey = nfs.pathToKey(nfs.makeAbsolute(new Path(LEASE_LOCK_FILE_KEY)));
+
+ Thread first = new Thread(new LeaseLockAction("first-thread", fullKey));
+ first.start();
+ Thread second = new Thread(new LeaseLockAction("second-thread", fullKey));
+ second.start();
+ try {
+
+ // Wait for the two threads to finish.
+ first.join();
+ second.join();
+ assertTrue(firstEndTime < secondStartTime);
+ } catch (InterruptedException e) {
+ fail("Unable to wait for threads to finish");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private class LeaseLockAction implements Runnable {
+ private String name;
+ private String key;
+
+ LeaseLockAction(String name, String key) {
+ this.name = name;
+ this.key = key;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("starting thread " + name);
+ SelfRenewingLease lease = null;
+ NativeAzureFileSystem nfs = (NativeAzureFileSystem) fs;
+
+ if (name.equals("first-thread")) {
+ try {
+ lease = nfs.getStore().acquireLease(key);
+ LOG.info(name + " acquired lease " + lease.getLeaseID());
+ } catch (AzureException e) {
+ assertTrue("Unanticipated exception", false);
+ }
+ assertTrue(lease != null);
+ try {
+
+ // Sleep long enough for the lease to renew once.
+ Thread.sleep(SelfRenewingLease.LEASE_RENEWAL_PERIOD + 2000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ try {
+ firstEndTime = System.currentTimeMillis();
+ lease.free();
+ LOG.info(name + " freed lease " + lease.getLeaseID());
+ } catch (StorageException e) {
+ fail("Unanticipated exception");
+ }
+ } else if (name.equals("second-thread")) {
+ try {
+
+ // sleep 2 sec to let first thread get ahead of this one
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ try {
+ LOG.info(name + " before getting lease");
+ lease = nfs.getStore().acquireLease(key);
+ secondStartTime = System.currentTimeMillis();
+ LOG.info(name + " acquired lease " + lease.getLeaseID());
+ } catch (AzureException e) {
+ assertTrue("Unanticipated exception", false);
+ }
+ assertTrue(lease != null);
+ try {
+ lease.free();
+ LOG.info(name + " freed lease " + lease.getLeaseID());
+ } catch (StorageException e) {
+ assertTrue("Unanticipated exception", false);
+ }
+ } else {
+ assertTrue("Unknown thread name", false);
+ }
+
+ LOG.info(name + " is exiting.");
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt
new file mode 100644
index 0000000..54ba4d8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt
@@ -0,0 +1,22 @@
+========================================================================
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+=========================================================================
+
+In order to run Windows Azure Storage Blob (WASB) unit tests against a live
+Azure Storage account, you need to provide test account details in a configuration
+file called azure-test.xml. See hadoop-tools/hadoop-azure/README.txt for details
+on configuration, and how to run the tests.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
index c10ac0f..0894cf5 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
@@ -22,11 +22,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeNotNull;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
import java.util.Arrays;
+import org.apache.hadoop.fs.azure.AzureException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.junit.After;
@@ -100,13 +99,14 @@ public class TestAzureConcurrentOutOfBandIo {
public void run() {
byte[] dataBlockWrite = new byte[UPLOAD_BLOCK_SIZE];
- DataOutputStream outputStream = null;
+ OutputStream outputStream = null;
try {
for (int i = 0; !done; i++) {
// Write two 4 MB blocks to the blob.
//
- outputStream = writerStorageAccount.getStore().storefile(key,
+ outputStream = writerStorageAccount.getStore().storefile(
+ key,
new PermissionStatus("", "", FsPermission.getDefault()));
Arrays.fill(dataBlockWrite, (byte) (i % 256));
@@ -124,7 +124,7 @@ public class TestAzureConcurrentOutOfBandIo {
} catch (IOException e) {
System.out
.println("DatablockWriter thread encountered an I/O exception."
- + e.getMessage());
+ + e.getMessage());
}
}
}
@@ -137,30 +137,29 @@ public class TestAzureConcurrentOutOfBandIo {
// Write to blob to make sure it exists.
//
- // Write five 4 MB blocks to the blob. To ensure there is data in the blob
- // before reading. This eliminates the race between the reader and writer
- // threads.
- DataOutputStream outputStream = testAccount.getStore().storefile(
- "WASB_String.txt",
- new PermissionStatus("", "", FsPermission.getDefault()));
- Arrays.fill(dataBlockWrite, (byte) 255);
- for (int i = 0; i < NUMBER_OF_BLOCKS; i++) {
- outputStream.write(dataBlockWrite);
- }
-
- outputStream.flush();
- outputStream.close();
-
- // Start writing blocks to Azure store using the DataBlockWriter thread.
+ // Write five 4 MB blocks to the blob. To ensure there is data in the blob before
+ // reading. This eliminates the race between the reader and writer threads.
+ OutputStream outputStream = testAccount.getStore().storefile(
+ "WASB_String.txt",
+ new PermissionStatus("", "", FsPermission.getDefault()));
+ Arrays.fill(dataBlockWrite, (byte) 255);
+ for (int i = 0; i < NUMBER_OF_BLOCKS; i++) {
+ outputStream.write(dataBlockWrite);
+ }
+
+ outputStream.flush();
+ outputStream.close();
+
+ // Start writing blocks to Azure store using the DataBlockWriter thread.
DataBlockWriter writeBlockTask = new DataBlockWriter(testAccount,
"WASB_String.txt");
- writeBlockTask.startWriting();
- int count = 0;
- DataInputStream inputStream = null;
+ writeBlockTask.startWriting();
+ int count = 0;
+ DataInputStream inputStream = null;
- for (int i = 0; i < 5; i++) {
- try {
- inputStream = testAccount.getStore().retrieve("WASB_String.txt", 0);
+ for (int i = 0; i < 5; i++) {
+ try {
+ inputStream = testAccount.getStore().retrieve("WASB_String.txt");
count = 0;
int c = 0;
@@ -173,17 +172,17 @@ public class TestAzureConcurrentOutOfBandIo {
// Counting the number of bytes.
count += c;
}
- } catch (IOException e) {
- System.out.println(e.getCause().toString());
- e.printStackTrace();
- fail();
- }
-
- // Close the stream.
- if (null != inputStream) {
- inputStream.close();
- }
- }
+ } catch (IOException e) {
+ System.out.println(e.getCause().toString());
+ e.printStackTrace();
+ fail();
+ }
+
+ // Close the stream.
+ if (null != inputStream){
+ inputStream.close();
+ }
+ }
// Stop writing blocks.
writeBlockTask.stopWriting();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
index 6e89822..febb605 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
@@ -32,7 +32,6 @@ import java.util.Arrays;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext;
@@ -65,19 +64,18 @@ public class TestAzureFileSystemErrorConditions {
*/
@Test
public void testAccessUnauthorizedPublicContainer() throws Exception {
- Configuration conf = new Configuration();
- AzureBlobStorageTestAccount.addWasbToConfiguration(conf);
Path noAccessPath = new Path(
"wasb://nonExistentContainer@hopefullyNonExistentAccount/someFile");
NativeAzureFileSystem.suppressRetryPolicy();
try {
- FileSystem.get(noAccessPath.toUri(), conf).open(noAccessPath);
+ FileSystem.get(noAccessPath.toUri(), new Configuration())
+ .open(noAccessPath);
assertTrue("Should've thrown.", false);
} catch (AzureException ex) {
- assertTrue("Unexpected message in exception " + ex, ex.getMessage()
- .contains(
- "Unable to access container nonExistentContainer in account"
- + " hopefullyNonExistentAccount"));
+ assertTrue("Unexpected message in exception " + ex,
+ ex.getMessage().contains(
+ "Unable to access container nonExistentContainer in account" +
+ " hopefullyNonExistentAccount"));
} finally {
NativeAzureFileSystem.resumeRetryPolicy();
}
@@ -104,11 +102,11 @@ public class TestAzureFileSystemErrorConditions {
fs.listStatus(new Path("/"));
passed = true;
} catch (AzureException ex) {
- assertTrue("Unexpected exception message: " + ex, ex.getMessage()
- .contains("unsupported version: 2090-04-05."));
+ assertTrue("Unexpected exception message: " + ex,
+ ex.getMessage().contains("unsupported version: 2090-04-05."));
}
- assertFalse(
- "Should've thrown an exception because of the wrong version.", passed);
+ assertFalse("Should've thrown an exception because of the wrong version.",
+ passed);
} finally {
fs.close();
}
@@ -118,8 +116,7 @@ public class TestAzureFileSystemErrorConditions {
boolean isTargetConnection(HttpURLConnection connection);
}
- private class TransientErrorInjector extends
- StorageEvent<SendingRequestEvent> {
+ private class TransientErrorInjector extends StorageEvent<SendingRequestEvent> {
final ConnectionRecognizer connectionRecognizer;
private boolean injectedErrorOnce = false;
@@ -129,8 +126,7 @@ public class TestAzureFileSystemErrorConditions {
@Override
public void eventOccurred(SendingRequestEvent eventArg) {
- HttpURLConnection connection = (HttpURLConnection) eventArg
- .getConnectionObject();
+ HttpURLConnection connection = (HttpURLConnection)eventArg.getConnectionObject();
if (!connectionRecognizer.isTargetConnection(connection)) {
return;
}
@@ -157,8 +153,8 @@ public class TestAzureFileSystemErrorConditions {
@Test
public void testTransientErrorOnDelete() throws Exception {
// Need to do this test against a live storage account
- AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount
- .create();
+ AzureBlobStorageTestAccount testAccount =
+ AzureBlobStorageTestAccount.create();
assumeNotNull(testAccount);
try {
NativeAzureFileSystem fs = testAccount.getFileSystem();
@@ -179,7 +175,7 @@ public class TestAzureFileSystemErrorConditions {
private void writeAllThreeFile(NativeAzureFileSystem fs, Path testFile)
throws IOException {
byte[] buffer = new byte[ALL_THREE_FILE_SIZE];
- Arrays.fill(buffer, (byte) 3);
+ Arrays.fill(buffer, (byte)3);
OutputStream stream = fs.create(testFile);
stream.write(buffer);
stream.close();
@@ -189,7 +185,8 @@ public class TestAzureFileSystemErrorConditions {
throws IOException {
byte[] buffer = new byte[ALL_THREE_FILE_SIZE];
InputStream inStream = fs.open(testFile);
- assertEquals(buffer.length, inStream.read(buffer, 0, buffer.length));
+ assertEquals(buffer.length,
+ inStream.read(buffer, 0, buffer.length));
inStream.close();
for (int i = 0; i < buffer.length; i++) {
assertEquals(3, buffer[i]);
@@ -199,8 +196,8 @@ public class TestAzureFileSystemErrorConditions {
@Test
public void testTransientErrorOnCommitBlockList() throws Exception {
// Need to do this test against a live storage account
- AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount
- .create();
+ AzureBlobStorageTestAccount testAccount =
+ AzureBlobStorageTestAccount.create();
assumeNotNull(testAccount);
try {
NativeAzureFileSystem fs = testAccount.getFileSystem();
@@ -222,8 +219,8 @@ public class TestAzureFileSystemErrorConditions {
@Test
public void testTransientErrorOnRead() throws Exception {
// Need to do this test against a live storage account
- AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount
- .create();
+ AzureBlobStorageTestAccount testAccount =
+ AzureBlobStorageTestAccount.create();
assumeNotNull(testAccount);
try {
NativeAzureFileSystem fs = testAccount.getFileSystem();
@@ -240,16 +237,4 @@ public class TestAzureFileSystemErrorConditions {
testAccount.cleanup();
}
}
-
- // Tests an error during stream creation (in this case in the seek() implementation
- // to verify the close-stream-on-error logic.
- @Test (expected=AzureException.class)
- public void testErrorDuringRetrieve() throws Exception {
- NativeAzureFileSystem fs = AzureBlobStorageTestAccount.createMock().getFileSystem();
- Path testFile = new Path("/testErrorDuringRetrieve");
- writeAllThreeFile(fs, testFile);
-
- FSDataInputStream stream = fs.open(testFile);
- stream.seek(Integer.MAX_VALUE);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java
index b585c56..25bd338 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java
@@ -128,7 +128,7 @@ public class TestBlobDataValidation {
if (!expectMd5Stored) {
throw ex;
}
- StorageException cause = (StorageException) ex.getCause();
+ StorageException cause = (StorageException)ex.getCause();
assertNotNull(cause);
assertTrue("Unexpected cause: " + cause,
cause.getErrorCode().equals(StorageErrorCodeStrings.INVALID_MD5));
@@ -212,13 +212,13 @@ public class TestBlobDataValidation {
// validate the data as expected, but the HttpURLConnection wasn't
// pluggable enough for me to do that.
testAccount.getFileSystem().getStore()
- .addTestHookToOperationContext(new TestHookOperationContext() {
- @Override
+ .addTestHookToOperationContext(new TestHookOperationContext() {
+ @Override
public OperationContext modifyOperationContext(
OperationContext original) {
- original.getResponseReceivedEventHandler().addListener(
- new ContentMD5Checker(expectMd5Checked));
- return original;
+ original.getResponseReceivedEventHandler().addListener(
+ new ContentMD5Checker(expectMd5Checked));
+ return original;
}
});
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java
index b75fc38..6c49926 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java
@@ -69,7 +69,8 @@ public class TestBlobMetadata {
throws Exception {
return String.format(
"{\"owner\":\"%s\",\"group\":\"%s\",\"permissions\":\"%s\"}",
- getExpectedOwner(), NativeAzureFileSystem.AZURE_DEFAULT_GROUP_DEFAULT,
+ getExpectedOwner(),
+ NativeAzureFileSystem.AZURE_DEFAULT_GROUP_DEFAULT,
permissionString);
}
@@ -80,8 +81,8 @@ public class TestBlobMetadata {
public void testContainerVersionMetadata() throws Exception {
// Do a write operation to trigger version stamp
fs.createNewFile(new Path("/foo"));
- HashMap<String, String> containerMetadata = backingStore
- .getContainerMetadata();
+ HashMap<String, String> containerMetadata =
+ backingStore.getContainerMetadata();
assertNotNull(containerMetadata);
assertEquals(AzureNativeFileSystemStore.CURRENT_WASB_VERSION,
containerMetadata.get(AzureNativeFileSystemStore.VERSION_METADATA_KEY));
@@ -226,26 +227,32 @@ public class TestBlobMetadata {
@Test
public void testOldPermissionMetadata() throws Exception {
Path selfishFile = new Path("/noOneElse");
- HashMap<String, String> metadata = new HashMap<String, String>();
- metadata.put("asv_permission", getExpectedPermissionString("rw-------"));
- backingStore.setContent(AzureBlobStorageTestAccount.toMockUri(selfishFile),
- new byte[] {}, metadata);
- FsPermission justMe = new FsPermission(FsAction.READ_WRITE, FsAction.NONE,
- FsAction.NONE);
+ HashMap<String, String> metadata =
+ new HashMap<String, String>();
+ metadata.put("asv_permission",
+ getExpectedPermissionString("rw-------"));
+ backingStore.setContent(
+ AzureBlobStorageTestAccount.toMockUri(selfishFile),
+ new byte[] { },
+ metadata, false, 0);
+ FsPermission justMe = new FsPermission(
+ FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE);
FileStatus retrievedStatus = fs.getFileStatus(selfishFile);
assertNotNull(retrievedStatus);
assertEquals(justMe, retrievedStatus.getPermission());
assertEquals(getExpectedOwner(), retrievedStatus.getOwner());
assertEquals(NativeAzureFileSystem.AZURE_DEFAULT_GROUP_DEFAULT,
retrievedStatus.getGroup());
- FsPermission meAndYou = new FsPermission(FsAction.READ_WRITE,
- FsAction.READ_WRITE, FsAction.NONE);
+ FsPermission meAndYou = new FsPermission(
+ FsAction.READ_WRITE, FsAction.READ_WRITE, FsAction.NONE);
fs.setPermission(selfishFile, meAndYou);
- metadata = backingStore.getMetadata(AzureBlobStorageTestAccount
- .toMockUri(selfishFile));
+ metadata =
+ backingStore.getMetadata(
+ AzureBlobStorageTestAccount.toMockUri(selfishFile));
assertNotNull(metadata);
String storedPermission = metadata.get("hdi_permission");
- assertEquals(getExpectedPermissionString("rw-rw----"), storedPermission);
+ assertEquals(getExpectedPermissionString("rw-rw----"),
+ storedPermission);
assertNull(metadata.get("asv_permission"));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java
new file mode 100644
index 0000000..afb16ef
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
+
+import junit.framework.*;
+
+import org.junit.Test;
+
+
+/**
+ * A simple benchmark to find out the difference in speed between block
+ * and page blobs.
+ */
+public class TestBlobTypeSpeedDifference extends TestCase {
+ /**
+ * Writes data to the given stream of the given size, flushing every
+ * x bytes.
+ */
+ private static void writeTestFile(OutputStream writeStream,
+ long size, long flushInterval) throws IOException {
+ int bufferSize = (int) Math.min(1000, flushInterval);
+ byte[] buffer = new byte[bufferSize];
+ Arrays.fill(buffer, (byte) 7);
+ int bytesWritten = 0;
+ int bytesUnflushed = 0;
+ while (bytesWritten < size) {
+ int numberToWrite = (int) Math.min(bufferSize, size - bytesWritten);
+ writeStream.write(buffer, 0, numberToWrite);
+ bytesWritten += numberToWrite;
+ bytesUnflushed += numberToWrite;
+ if (bytesUnflushed >= flushInterval) {
+ writeStream.flush();
+ bytesUnflushed = 0;
+ }
+ }
+ }
+
+ private static class TestResult {
+ final long timeTakenInMs;
+ final long totalNumberOfRequests;
+
+ TestResult(long timeTakenInMs, long totalNumberOfRequests) {
+ this.timeTakenInMs = timeTakenInMs;
+ this.totalNumberOfRequests = totalNumberOfRequests;
+ }
+ }
+
+ /**
+ * Writes data to the given file of the given size, flushing every
+ * x bytes. Measure performance of that and return it.
+ */
+ private static TestResult writeTestFile(NativeAzureFileSystem fs, Path path,
+ long size, long flushInterval) throws IOException {
+ AzureFileSystemInstrumentation instrumentation =
+ fs.getInstrumentation();
+ long initialRequests = instrumentation.getCurrentWebResponses();
+ Date start = new Date();
+ OutputStream output = fs.create(path);
+ writeTestFile(output, size, flushInterval);
+ output.close();
+ long finalRequests = instrumentation.getCurrentWebResponses();
+ return new TestResult(new Date().getTime() - start.getTime(),
+ finalRequests - initialRequests);
+ }
+
+ /**
+ * Writes data to a block blob of the given size, flushing every
+ * x bytes. Measure performance of that and return it.
+ */
+ private static TestResult writeBlockBlobTestFile(NativeAzureFileSystem fs,
+ long size, long flushInterval) throws IOException {
+ return writeTestFile(fs, new Path("/blockBlob"), size, flushInterval);
+ }
+
+ /**
+ * Writes data to a page blob of the given size, flushing every
+ * x bytes. Measure performance of that and return it.
+ */
+ private static TestResult writePageBlobTestFile(NativeAzureFileSystem fs,
+ long size, long flushInterval) throws IOException {
+ return writeTestFile(fs,
+ AzureBlobStorageTestAccount.pageBlobPath("pageBlob"),
+ size, flushInterval);
+ }
+
+ /**
+ * Runs the benchmark over a small 10 KB file, flushing every 500 bytes.
+ */
+ @Test
+ public void testTenKbFileFrequentFlush() throws Exception {
+ AzureBlobStorageTestAccount testAccount =
+ AzureBlobStorageTestAccount.create();
+ if (testAccount == null) {
+ return;
+ }
+ try {
+ testForSizeAndFlushInterval(testAccount.getFileSystem(), 10 * 1000, 500);
+ } finally {
+ testAccount.cleanup();
+ }
+ }
+
+ /**
+ * Runs the benchmark for the given file size and flush frequency.
+ */
+ private static void testForSizeAndFlushInterval(NativeAzureFileSystem fs,
+ final long size, final long flushInterval) throws IOException {
+ for (int i = 0; i < 5; i++) {
+ TestResult pageBlobResults = writePageBlobTestFile(fs, size, flushInterval);
+ System.out.printf(
+ "Page blob upload took %d ms. Total number of requests: %d.\n",
+ pageBlobResults.timeTakenInMs, pageBlobResults.totalNumberOfRequests);
+ TestResult blockBlobResults = writeBlockBlobTestFile(fs, size, flushInterval);
+ System.out.printf(
+ "Block blob upload took %d ms. Total number of requests: %d.\n",
+ blockBlobResults.timeTakenInMs, blockBlobResults.totalNumberOfRequests);
+ }
+ }
+
+ /**
+ * Runs the benchmark for the given file size and flush frequency from the
+ * command line.
+ */
+ public static void main(String argv[]) throws Exception {
+ Configuration conf = new Configuration();
+ long size = 10 * 1000 * 1000;
+ long flushInterval = 2000;
+ if (argv.length > 0) {
+ size = Long.parseLong(argv[0]);
+ }
+ if (argv.length > 1) {
+ flushInterval = Long.parseLong(argv[1]);
+ }
+ testForSizeAndFlushInterval((NativeAzureFileSystem)FileSystem.get(conf),
+ size, flushInterval);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFSPageBlobLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFSPageBlobLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFSPageBlobLive.java
new file mode 100644
index 0000000..208cff3
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFSPageBlobLive.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run the base Azure file system tests strictly on page blobs to make sure fundamental
+ * operations on page blob files and folders work as expected.
+ * These operations include create, delete, rename, list, and so on.
+ */
+public class TestNativeAzureFSPageBlobLive extends
+ NativeAzureFileSystemBaseTest {
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount()
+ throws Exception {
+ Configuration conf = new Configuration();
+
+ // Configure the page blob directories key so every file created is a page blob.
+ conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+ // Configure the atomic rename directories key so every folder will have
+ // atomic rename applied.
+ conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+ return AzureBlobStorageTestAccount.create(conf);
+ }
+}