You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2014/10/08 23:24:41 UTC
[4/5] HADOOP-10809. hadoop-azure: page blob support. Contributed by
Dexter Bradshaw, Mostafa Elhemali, Eric Hanson, and Mike Liddell.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index dae957e..076c48a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -25,14 +25,21 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Calendar;
import java.util.Date;
+import java.util.EnumSet;
+import java.util.Iterator;
import java.util.Set;
+import java.util.TimeZone;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -40,6 +47,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
@@ -50,12 +58,26 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.fs.azure.AzureException;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
+
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
import com.google.common.annotations.VisibleForTesting;
-import com.microsoft.windowsazure.storage.core.Utility;
+import com.microsoft.windowsazure.storage.AccessCondition;
+import com.microsoft.windowsazure.storage.OperationContext;
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
+import com.microsoft.windowsazure.storage.core.*;
/**
* <p>
@@ -68,6 +90,495 @@ import com.microsoft.windowsazure.storage.core.Utility;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class NativeAzureFileSystem extends FileSystem {
+ private static final int USER_WX_PERMISION = 0300;
+
+ /**
+ * A description of a folder rename operation, including the source and
+ * destination keys, and descriptions of the files in the source folder.
+ */
+ public static class FolderRenamePending {
+ private SelfRenewingLease folderLease;
+ private String srcKey;
+ private String dstKey;
+ private FileMetadata[] fileMetadata = null; // descriptions of source files
+ private ArrayList<String> fileStrings = null;
+ private NativeAzureFileSystem fs;
+ private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000;
+ private static final int FORMATTING_BUFFER = 10000;
+ private boolean committed;
+ public static final String SUFFIX = "-RenamePending.json";
+
+ // Prepare in-memory information needed to do or redo a folder rename.
+ public FolderRenamePending(String srcKey, String dstKey, SelfRenewingLease lease,
+ NativeAzureFileSystem fs) throws IOException {
+ this.srcKey = srcKey;
+ this.dstKey = dstKey;
+ this.folderLease = lease;
+ this.fs = fs;
+ ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
+
+ // List all the files in the folder.
+ String priorLastKey = null;
+ do {
+ PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL,
+ AZURE_UNBOUNDED_DEPTH, priorLastKey);
+ for(FileMetadata file : listing.getFiles()) {
+ fileMetadataList.add(file);
+ }
+ priorLastKey = listing.getPriorLastKey();
+ } while (priorLastKey != null);
+ fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
+ this.committed = true;
+ }
+
+ // Prepare in-memory information needed to do or redo folder rename from
+ // a -RenamePending.json file read from storage. This constructor is to use during
+ // redo processing.
+ public FolderRenamePending(Path redoFile, NativeAzureFileSystem fs)
+ throws IllegalArgumentException, IOException {
+
+ this.fs = fs;
+
+ // open redo file
+ Path f = redoFile;
+ FSDataInputStream input = fs.open(f);
+ byte[] bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE];
+ int l = input.read(bytes);
+ if (l < 0) {
+ throw new IOException(
+ "Error reading pending rename file contents -- no data available");
+ }
+ if (l == MAX_RENAME_PENDING_FILE_SIZE) {
+ throw new IOException(
+ "Error reading pending rename file contents -- "
+ + "maximum file size exceeded");
+ }
+ String contents = new String(bytes, 0, l);
+
+ // parse the JSON
+ ObjectMapper objMapper = new ObjectMapper();
+ objMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
+ JsonNode json = null;
+ try {
+ json = objMapper.readValue(contents, JsonNode.class);
+ this.committed = true;
+ } catch (JsonMappingException e) {
+
+ // The -RedoPending.json file is corrupted, so we assume it was
+ // not completely written
+ // and the redo operation did not commit.
+ this.committed = false;
+ } catch (JsonParseException e) {
+ this.committed = false;
+ } catch (IOException e) {
+ this.committed = false;
+ }
+
+ if (!this.committed) {
+ LOG.error("Deleting corruped rename pending file "
+ + redoFile + "\n" + contents);
+
+ // delete the -RenamePending.json file
+ fs.delete(redoFile, false);
+ return;
+ }
+
+ // initialize this object's fields
+ ArrayList<String> fileStrList = new ArrayList<String>();
+ JsonNode oldFolderName = json.get("OldFolderName");
+ JsonNode newFolderName = json.get("NewFolderName");
+ if (oldFolderName == null || newFolderName == null) {
+ this.committed = false;
+ } else {
+ this.srcKey = oldFolderName.getTextValue();
+ this.dstKey = newFolderName.getTextValue();
+ if (this.srcKey == null || this.dstKey == null) {
+ this.committed = false;
+ } else {
+ JsonNode fileList = json.get("FileList");
+ if (fileList == null) {
+ this.committed = false;
+ } else {
+ for (int i = 0; i < fileList.size(); i++) {
+ fileStrList.add(fileList.get(i).getTextValue());
+ }
+ }
+ }
+ }
+ this.fileStrings = fileStrList;
+ }
+
+ public FileMetadata[] getFiles() {
+ return fileMetadata;
+ }
+
+ public SelfRenewingLease getFolderLease() {
+ return folderLease;
+ }
+
+ /**
+ * Write to disk the information needed to redo folder rename, in JSON format.
+ * The file name will be wasb://<sourceFolderPrefix>/folderName-RenamePending.json
+ * The file format will be:
+ * {
+ * FormatVersion: "1.0",
+ * OperationTime: "<YYYY-MM-DD HH:MM:SS.MMM>",
+ * OldFolderName: "<key>",
+ * NewFolderName: "<key>",
+ * FileList: [ <string> , <string> , ... ]
+ * }
+ *
+ * Here's a sample:
+ * {
+ * FormatVersion: "1.0",
+ * OperationUTCTime: "2014-07-01 23:50:35.572",
+ * OldFolderName: "user/ehans/folderToRename",
+ * NewFolderName: "user/ehans/renamedFolder",
+ * FileList: [
+ * "innerFile",
+ * "innerFile2"
+ * ]
+ * }
+ * @throws IOException
+ */
+ public void writeFile(FileSystem fs) throws IOException {
+ Path path = getRenamePendingFilePath();
+ if (LOG.isDebugEnabled()){
+ LOG.debug("Preparing to write atomic rename state to " + path.toString());
+ }
+ OutputStream output = null;
+
+ String contents = makeRenamePendingFileContents();
+
+ // Write file.
+ try {
+ output = fs.create(path);
+ output.write(contents.getBytes());
+ } catch (IOException e) {
+ throw new IOException("Unable to write RenamePending file for folder rename from "
+ + srcKey + " to " + dstKey, e);
+ } finally {
+ IOUtils.cleanup(LOG, output);
+ }
+ }
+
+ /**
+ * Return the contents of the JSON file to represent the operations
+ * to be performed for a folder rename.
+ */
+ public String makeRenamePendingFileContents() {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+ String time = sdf.format(new Date());
+
+ // Make file list string
+ StringBuilder builder = new StringBuilder();
+ builder.append("[\n");
+ for (int i = 0; i != fileMetadata.length; i++) {
+ if (i > 0) {
+ builder.append(",\n");
+ }
+ builder.append(" ");
+ String noPrefix = StringUtils.removeStart(fileMetadata[i].getKey(), srcKey + "/");
+
+ // Quote string file names, escaping any possible " characters or other
+ // necessary characters in the name.
+ builder.append(quote(noPrefix));
+ if (builder.length() >=
+ MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) {
+
+ // Give up now to avoid using too much memory.
+ LOG.error("Internal error: Exceeded maximum rename pending file size of "
+ + MAX_RENAME_PENDING_FILE_SIZE + " bytes.");
+
+ // return some bad JSON with an error message to make it human readable
+ return "exceeded maximum rename pending file size";
+ }
+ }
+ builder.append("\n ]");
+ String fileList = builder.toString();
+
+ // Make file contents as a string. Again, quote file names, escaping
+ // characters as appropriate.
+ String contents = "{\n"
+ + " FormatVersion: \"1.0\",\n"
+ + " OperationUTCTime: \"" + time + "\",\n"
+ + " OldFolderName: " + quote(srcKey) + ",\n"
+ + " NewFolderName: " + quote(dstKey) + ",\n"
+ + " FileList: " + fileList + "\n"
+ + "}\n";
+
+ return contents;
+ }
+
+ /**
+ * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote
+ * method.
+ *
+ * Produce a string in double quotes with backslash sequences in all the
+ * right places. A backslash will be inserted within </, allowing JSON
+ * text to be delivered in HTML. In JSON text, a string cannot contain a
+ * control character or an unescaped quote or backslash.
+ * @param string A String
+ * @return A String correctly formatted for insertion in a JSON text.
+ */
+ private String quote(String string) {
+ if (string == null || string.length() == 0) {
+ return "\"\"";
+ }
+
+ char c = 0;
+ int i;
+ int len = string.length();
+ StringBuilder sb = new StringBuilder(len + 4);
+ String t;
+
+ sb.append('"');
+ for (i = 0; i < len; i += 1) {
+ c = string.charAt(i);
+ switch (c) {
+ case '\\':
+ case '"':
+ sb.append('\\');
+ sb.append(c);
+ break;
+ case '/':
+ sb.append('\\');
+ sb.append(c);
+ break;
+ case '\b':
+ sb.append("\\b");
+ break;
+ case '\t':
+ sb.append("\\t");
+ break;
+ case '\n':
+ sb.append("\\n");
+ break;
+ case '\f':
+ sb.append("\\f");
+ break;
+ case '\r':
+ sb.append("\\r");
+ break;
+ default:
+ if (c < ' ') {
+ t = "000" + Integer.toHexString(c);
+ sb.append("\\u" + t.substring(t.length() - 4));
+ } else {
+ sb.append(c);
+ }
+ }
+ }
+ sb.append('"');
+ return sb.toString();
+ }
+
+ public String getSrcKey() {
+ return srcKey;
+ }
+
+ public String getDstKey() {
+ return dstKey;
+ }
+
+ public FileMetadata getSourceMetadata() throws IOException {
+ return fs.getStoreInterface().retrieveMetadata(srcKey);
+ }
+
+ /**
+ * Execute a folder rename. This is the execution path followed
+ * when everything is working normally. See redo() for the alternate
+ * execution path for the case where we're recovering from a folder rename
+ * failure.
+ * @throws IOException
+ */
+ public void execute() throws IOException {
+
+ for (FileMetadata file : this.getFiles()) {
+
+ // Rename all materialized entries under the folder to point to the
+ // final destination.
+ if (file.getBlobMaterialization() == BlobMaterialization.Explicit) {
+ String srcName = file.getKey();
+ String suffix = srcName.substring((this.getSrcKey()).length());
+ String dstName = this.getDstKey() + suffix;
+
+ // Rename gets exclusive access (via a lease) for files
+ // designated for atomic rename.
+ // The main use case is for HBase write-ahead log (WAL) and data
+ // folder processing correctness. See the rename code for details.
+ boolean acquireLease = fs.getStoreInterface().isAtomicRenameKey(srcName);
+ fs.getStoreInterface().rename(srcName, dstName, acquireLease, null);
+ }
+ }
+
+ // Rename the source folder 0-byte root file itself.
+ FileMetadata srcMetadata2 = this.getSourceMetadata();
+ if (srcMetadata2.getBlobMaterialization() ==
+ BlobMaterialization.Explicit) {
+
+ // It already has a lease on it from the "prepare" phase so there's no
+ // need to get one now. Pass in existing lease to allow file delete.
+ fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(),
+ false, folderLease);
+ }
+
+ // Update the last-modified time of the parent folders of both source and
+ // destination.
+ fs.updateParentFolderLastModifiedTime(srcKey);
+ fs.updateParentFolderLastModifiedTime(dstKey);
+ }
+
+ /** Clean up after execution of rename.
+ * @throws IOException */
+ public void cleanup() throws IOException {
+
+ if (fs.getStoreInterface().isAtomicRenameKey(srcKey)) {
+
+ // Remove RenamePending file
+ fs.delete(getRenamePendingFilePath(), false);
+
+ // Freeing source folder lease is not necessary since the source
+ // folder file was deleted.
+ }
+ }
+
+ private Path getRenamePendingFilePath() {
+ String fileName = srcKey + SUFFIX;
+ Path fileNamePath = keyToPath(fileName);
+ Path path = fs.makeAbsolute(fileNamePath);
+ return path;
+ }
+
+ /**
+ * Recover from a folder rename failure by redoing the intended work,
+ * as recorded in the -RenamePending.json file.
+ *
+ * @throws IOException
+ */
+ public void redo() throws IOException {
+
+ if (!committed) {
+
+ // Nothing to do. The -RedoPending.json file should have already been
+ // deleted.
+ return;
+ }
+
+ // Try to get a lease on source folder to block concurrent access to it.
+ // It may fail if the folder is already gone. We don't check if the
+ // source exists explicitly because that could recursively trigger redo
+ // and give an infinite recursion.
+ SelfRenewingLease lease = null;
+ boolean sourceFolderGone = false;
+ try {
+ lease = fs.leaseSourceFolder(srcKey);
+ } catch (AzureException e) {
+
+ // If the source folder was not found then somebody probably
+ // raced with us and finished the rename first, or the
+ // first rename failed right before deleting the rename pending
+ // file.
+ String errorCode = "";
+ try {
+ StorageException se = (StorageException) e.getCause();
+ errorCode = se.getErrorCode();
+ } catch (Exception e2) {
+ ; // do nothing -- could not get errorCode
+ }
+ if (errorCode.equals("BlobNotFound")) {
+ sourceFolderGone = true;
+ } else {
+ throw new IOException(
+ "Unexpected error when trying to lease source folder name during "
+ + "folder rename redo",
+ e);
+ }
+ }
+
+ if (!sourceFolderGone) {
+ // Make sure the target folder exists.
+ Path dst = fullPath(dstKey);
+ if (!fs.exists(dst)) {
+ fs.mkdirs(dst);
+ }
+
+ // For each file inside the folder to be renamed,
+ // make sure it has been renamed.
+ for(String fileName : fileStrings) {
+ finishSingleFileRename(fileName);
+ }
+
+ // Remove the source folder. Don't check explicitly if it exists,
+ // to avoid triggering redo recursively.
+ try {
+ fs.getStoreInterface().delete(srcKey, lease);
+ } catch (Exception e) {
+ LOG.info("Unable to delete source folder during folder rename redo. "
+ + "If the source folder is already gone, this is not an error "
+ + "condition. Continuing with redo.", e);
+ }
+
+ // Update the last-modified time of the parent folders of both source
+ // and destination.
+ fs.updateParentFolderLastModifiedTime(srcKey);
+ fs.updateParentFolderLastModifiedTime(dstKey);
+ }
+
+ // Remove the -RenamePending.json file.
+ fs.delete(getRenamePendingFilePath(), false);
+ }
+
+ // See if the source file is still there, and if it is, rename it.
+ private void finishSingleFileRename(String fileName)
+ throws IOException {
+ Path srcFile = fullPath(srcKey, fileName);
+ Path dstFile = fullPath(dstKey, fileName);
+ boolean srcExists = fs.exists(srcFile);
+ boolean dstExists = fs.exists(dstFile);
+ if (srcExists && !dstExists) {
+
+ // Rename gets exclusive access (via a lease) for HBase write-ahead log
+ // (WAL) file processing correctness. See the rename code for details.
+ String srcName = fs.pathToKey(srcFile);
+ String dstName = fs.pathToKey(dstFile);
+ fs.getStoreInterface().rename(srcName, dstName, true, null);
+ } else if (srcExists && dstExists) {
+
+ // Get a lease on source to block write access.
+ String srcName = fs.pathToKey(srcFile);
+ SelfRenewingLease lease = fs.acquireLease(srcFile);
+
+ // Delete the file. This will free the lease too.
+ fs.getStoreInterface().delete(srcName, lease);
+ } else if (!srcExists && dstExists) {
+
+ // The rename already finished, so do nothing.
+ ;
+ } else {
+ throw new IOException(
+ "Attempting to complete rename of file " + srcKey + "/" + fileName
+ + " during folder rename redo, and file was not found in source "
+ + "or destination.");
+ }
+ }
+
+ // Return an absolute path for the specific fileName within the folder
+ // specified by folderKey.
+ private Path fullPath(String folderKey, String fileName) {
+ return new Path(new Path(fs.getUri()), "/" + folderKey + "/" + fileName);
+ }
+
+ private Path fullPath(String fileKey) {
+ return new Path(new Path(fs.getUri()), "/" + fileKey);
+ }
+ }
+
+ private static final String TRAILING_PERIOD_PLACEHOLDER = "[[.]]";
+ private static final Pattern TRAILING_PERIOD_PLACEHOLDER_PATTERN =
+ Pattern.compile("\\[\\[\\.\\]\\](?=$|/)");
+ private static final Pattern TRAILING_PERIOD_PATTERN = Pattern.compile("\\.(?=$|/)");
@Override
public String getScheme() {
@@ -121,17 +632,53 @@ public class NativeAzureFileSystem extends FileSystem {
*/
static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup";
- static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost";
- private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
+ static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME =
+ "fs.azure.block.location.impersonatedhost";
+ private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT =
+ "localhost";
+ static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME =
+ "fs.azure.ring.buffer.capacity";
+ static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME =
+ "fs.azure.output.stream.buffer.size";
private class NativeAzureFsInputStream extends FSInputStream {
private InputStream in;
private final String key;
private long pos = 0;
+ private boolean closed = false;
+ private boolean isPageBlob;
- public NativeAzureFsInputStream(DataInputStream in, String key) {
+ // File length, valid only for streams over block blobs.
+ private long fileLength;
+
+ public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) {
this.in = in;
this.key = key;
+ this.isPageBlob = store.isPageBlobKey(key);
+ this.fileLength = fileLength;
+ }
+
+ /**
+ * Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ *
+ * This is to match the behavior of DFSInputStream.available(),
+ * which some clients may rely on (HBase write-ahead log reading in
+ * particular).
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ if (isPageBlob) {
+ return in.available();
+ } else {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ final long remaining = this.fileLength - pos;
+ return remaining <= Integer.MAX_VALUE ?
+ (int) remaining : Integer.MAX_VALUE;
+ }
}
/*
@@ -140,7 +687,7 @@ public class NativeAzureFileSystem extends FileSystem {
* because the end of the stream has been reached, the value -1 is returned.
* This method blocks until input data is available, the end of the stream
* is detected, or an exception is thrown.
- *
+ *
* @returns int An integer corresponding to the byte read.
*/
@Override
@@ -169,13 +716,13 @@ public class NativeAzureFileSystem extends FileSystem {
* one byte. If no byte is available because the stream is at end of file,
* the value -1 is returned; otherwise, at least one byte is read and stored
* into b.
- *
+ *
* @param b -- the buffer into which data is read
- *
+ *
* @param off -- the start offset in the array b at which data is written
- *
+ *
* @param len -- the maximum number of bytes read
- *
+ *
* @ returns int The total number of byes read into the buffer, or -1 if
* there is no more data because the end of stream is reached.
*/
@@ -196,15 +743,20 @@ public class NativeAzureFileSystem extends FileSystem {
}
@Override
- public synchronized void close() throws IOException {
+ public void close() throws IOException {
in.close();
+ closed = true;
}
@Override
public synchronized void seek(long pos) throws IOException {
- in.close();
- in = store.retrieve(key, pos);
- this.pos = pos;
+ in.close();
+ in = store.retrieve(key);
+ this.pos = in.skip(pos);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Seek to position %d. Bytes skipped %d", pos,
+ this.pos));
+ }
}
@Override
@@ -468,7 +1020,8 @@ public class NativeAzureFileSystem extends FileSystem {
}
@Override
- public void initialize(URI uri, Configuration conf) throws IOException {
+ public void initialize(URI uri, Configuration conf)
+ throws IOException, IllegalArgumentException {
// Check authority for the URI to guarantee that it is non-null.
uri = reconstructAuthorityIfNeeded(uri, conf);
if (null == uri.getAuthority()) {
@@ -514,10 +1067,29 @@ public class NativeAzureFileSystem extends FileSystem {
return actualStore;
}
- // Note: The logic for this method is confusing as to whether it strips the
- // last slash or not (it adds it in the beginning, then strips it at the end).
- // We should revisit that.
- private String pathToKey(Path path) {
+ /**
+ * Azure Storage doesn't allow the blob names to end in a period,
+ * so encode this here to work around that limitation.
+ */
+ private static String encodeTrailingPeriod(String toEncode) {
+ Matcher matcher = TRAILING_PERIOD_PATTERN.matcher(toEncode);
+ return matcher.replaceAll(TRAILING_PERIOD_PLACEHOLDER);
+ }
+
+ /**
+ * Reverse the encoding done by encodeTrailingPeriod().
+ */
+ private static String decodeTrailingPeriod(String toDecode) {
+ Matcher matcher = TRAILING_PERIOD_PLACEHOLDER_PATTERN.matcher(toDecode);
+ return matcher.replaceAll(".");
+ }
+
+ /**
+ * Convert the path to a key. By convention, any leading or trailing slash is
+ * removed, except for the special case of a single slash.
+ */
+ @VisibleForTesting
+ public String pathToKey(Path path) {
// Convert the path to a URI to parse the scheme, the authority, and the
// path from the path object.
URI tmpUri = path.toUri();
@@ -537,6 +1109,8 @@ public class NativeAzureFileSystem extends FileSystem {
String key = null;
key = newPath.toUri().getPath();
+ key = removeTrailingSlash(key);
+ key = encodeTrailingPeriod(key);
if (key.length() == 1) {
return key;
} else {
@@ -544,14 +1118,34 @@ public class NativeAzureFileSystem extends FileSystem {
}
}
+ // Remove any trailing slash except for the case of a single slash.
+ private static String removeTrailingSlash(String key) {
+ if (key.length() == 0 || key.length() == 1) {
+ return key;
+ }
+ if (key.charAt(key.length() - 1) == '/') {
+ return key.substring(0, key.length() - 1);
+ } else {
+ return key;
+ }
+ }
+
private static Path keyToPath(String key) {
if (key.equals("/")) {
return new Path("/"); // container
}
- return new Path("/" + key);
+ return new Path("/" + decodeTrailingPeriod(key));
}
- private Path makeAbsolute(Path path) {
+ /**
+ * Get the absolute version of the path (fully qualified).
+ * This is public for testing purposes.
+ *
+ * @param path
+ * @return fully qualified path
+ */
+ @VisibleForTesting
+ public Path makeAbsolute(Path path) {
if (path.isAbsolute()) {
return path;
}
@@ -569,6 +1163,10 @@ public class NativeAzureFileSystem extends FileSystem {
return actualStore;
}
+ NativeFileSystemStore getStoreInterface() {
+ return store;
+ }
+
/**
* Gets the metrics source for this file system.
* This is mainly here for unit testing purposes.
@@ -590,6 +1188,145 @@ public class NativeAzureFileSystem extends FileSystem {
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
+ return create(f, permission, overwrite, true,
+ bufferSize, replication, blockSize, progress,
+ (SelfRenewingLease) null);
+ }
+
+ /**
+ * Get a self-renewing lease on the specified file.
+ */
+ public SelfRenewingLease acquireLease(Path path) throws AzureException {
+ String fullKey = pathToKey(makeAbsolute(path));
+ return getStore().acquireLease(fullKey);
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+
+ Path parent = f.getParent();
+
+ // Get exclusive access to folder if this is a directory designated
+ // for atomic rename. The primary use case of for HBase write-ahead
+ // log file management.
+ SelfRenewingLease lease = null;
+ if (store.isAtomicRenameKey(pathToKey(f))) {
+ try {
+ lease = acquireLease(parent);
+ } catch (AzureException e) {
+
+ String errorCode = "";
+ try {
+ StorageException e2 = (StorageException) e.getCause();
+ errorCode = e2.getErrorCode();
+ } catch (Exception e3) {
+ // do nothing if cast fails
+ }
+ if (errorCode.equals("BlobNotFound")) {
+ throw new FileNotFoundException("Cannot create file " +
+ f.getName() + " because parent folder does not exist.");
+ }
+
+ LOG.warn("Got unexpected exception trying to get lease on "
+ + pathToKey(parent) + ". " + e.getMessage());
+ throw e;
+ }
+ }
+
+ // See if the parent folder exists. If not, throw error.
+ // The exists() check will push any pending rename operation forward,
+ // if there is one, and return false.
+ //
+ // At this point, we have exclusive access to the source folder
+ // via the lease, so we will not conflict with an active folder
+ // rename operation.
+ if (!exists(parent)) {
+ try {
+
+ // This'll let the keep-alive thread exit as soon as it wakes up.
+ lease.free();
+ } catch (Exception e) {
+ LOG.warn("Unable to free lease because: " + e.getMessage());
+ }
+ throw new FileNotFoundException("Cannot create file " +
+ f.getName() + " because parent folder does not exist.");
+ }
+
+ // Create file inside folder.
+ FSDataOutputStream out = null;
+ try {
+ out = create(f, permission, overwrite, false,
+ bufferSize, replication, blockSize, progress, lease);
+ } finally {
+ // Release exclusive access to folder.
+ try {
+ if (lease != null) {
+ lease.free();
+ }
+ } catch (Exception e) {
+ IOUtils.cleanup(LOG, out);
+ String msg = "Unable to free lease on " + parent.toUri();
+ LOG.error(msg);
+ throw new IOException(msg, e);
+ }
+ }
+ return out;
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+ EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+
+ // Check if file should be appended or overwritten. Assume that the file
+ // is overwritten on if the CREATE and OVERWRITE create flags are set. Note
+ // that any other combinations of create flags will result in an open new or
+ // open with append.
+ final EnumSet<CreateFlag> createflags =
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
+ boolean overwrite = flags.containsAll(createflags);
+
+ // Delegate the create non-recursive call.
+ return this.createNonRecursive(f, permission, overwrite,
+ bufferSize, replication, blockSize, progress);
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public FSDataOutputStream createNonRecursive(Path f,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ return this.createNonRecursive(f, FsPermission.getFileDefault(),
+ overwrite, bufferSize, replication, blockSize, progress);
+ }
+
+
+ /**
+ * Create an Azure blob and return an output stream to use
+ * to write data to it.
+ *
+ * @param f
+ * @param permission
+ * @param overwrite
+ * @param createParent
+ * @param bufferSize
+ * @param replication
+ * @param blockSize
+ * @param progress
+ * @param parentFolderLease Lease on parent folder (or null if
+ * no lease).
+ * @return
+ * @throws IOException
+ */
+ private FSDataOutputStream create(Path f, FsPermission permission,
+ boolean overwrite, boolean createParent, int bufferSize,
+ short replication, long blockSize, Progressable progress,
+ SelfRenewingLease parentFolderLease)
+ throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating file: " + f.toString());
@@ -620,45 +1357,60 @@ public class NativeAzureFileSystem extends FileSystem {
// already exists.
String parentKey = pathToKey(parentFolder);
FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
- if (parentMetadata != null
- && parentMetadata.isDir()
- && parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
- store.updateFolderLastModifiedTime(parentKey);
+ if (parentMetadata != null && parentMetadata.isDir() &&
+ parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
+ store.updateFolderLastModifiedTime(parentKey, parentFolderLease);
} else {
// Make sure that the parent folder exists.
- mkdirs(parentFolder, permission);
+ // Create it using inherited permissions from the first existing directory going up the path
+ Path firstExisting = parentFolder.getParent();
+ FileMetadata metadata = store.retrieveMetadata(pathToKey(firstExisting));
+ while(metadata == null) {
+ // Guaranteed to terminate properly because we will eventually hit root, which will return non-null metadata
+ firstExisting = firstExisting.getParent();
+ metadata = store.retrieveMetadata(pathToKey(firstExisting));
+ }
+ mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true);
}
}
- // Open the output blob stream based on the encoded key.
- String keyEncoded = encodeKey(key);
-
// Mask the permission first (with the default permission mask as well).
FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile);
PermissionStatus permissionStatus = createPermissionStatus(masked);
- // First create a blob at the real key, pointing back to the temporary file
- // This accomplishes a few things:
- // 1. Makes sure we can create a file there.
- // 2. Makes it visible to other concurrent threads/processes/nodes what
- // we're
- // doing.
- // 3. Makes it easier to restore/cleanup data in the event of us crashing.
- store.storeEmptyLinkFile(key, keyEncoded, permissionStatus);
-
- // The key is encoded to point to a common container at the storage server.
- // This reduces the number of splits on the server side when load balancing.
- // Ingress to Azure storage can take advantage of earlier splits. We remove
- // the root path to the key and prefix a random GUID to the tail (or leaf
- // filename) of the key. Keys are thus broadly and randomly distributed over
- // a single container to ease load balancing on the storage server. When the
- // blob is committed it is renamed to its earlier key. Uncommitted blocks
- // are not cleaned up and we leave it to Azure storage to garbage collect
- // these
- // blocks.
- OutputStream bufOutStream = new NativeAzureFsOutputStream(store.storefile(
- keyEncoded, permissionStatus), key, keyEncoded);
-
+ OutputStream bufOutStream;
+ if (store.isPageBlobKey(key)) {
+ // Store page blobs directly in-place without renames.
+ bufOutStream = store.storefile(key, permissionStatus);
+ } else {
+ // This is a block blob, so open the output blob stream based on the
+ // encoded key.
+ //
+ String keyEncoded = encodeKey(key);
+
+
+ // First create a blob at the real key, pointing back to the temporary file
+ // This accomplishes a few things:
+ // 1. Makes sure we can create a file there.
+ // 2. Makes it visible to other concurrent threads/processes/nodes what
+ // we're
+ // doing.
+ // 3. Makes it easier to restore/cleanup data in the event of us crashing.
+ store.storeEmptyLinkFile(key, keyEncoded, permissionStatus);
+
+ // The key is encoded to point to a common container at the storage server.
+ // This reduces the number of splits on the server side when load balancing.
+ // Ingress to Azure storage can take advantage of earlier splits. We remove
+ // the root path to the key and prefix a random GUID to the tail (or leaf
+ // filename) of the key. Keys are thus broadly and randomly distributed over
+ // a single container to ease load balancing on the storage server. When the
+ // blob is committed it is renamed to its earlier key. Uncommitted blocks
+ // are not cleaned up and we leave it to Azure storage to garbage collect
+ // these
+ // blocks.
+ bufOutStream = new NativeAzureFsOutputStream(store.storefile(
+ keyEncoded, permissionStatus), key, keyEncoded);
+ }
// Construct the data output stream from the buffered output stream.
FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
@@ -678,6 +1430,28 @@ public class NativeAzureFileSystem extends FileSystem {
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
+ return delete(f, recursive, false);
+ }
+
+ /**
+ * Delete the specified file or folder. The parameter
+ * skipParentFolderLastModifidedTimeUpdate
+ * is used in the case of atomic folder rename redo. In that case, there is
+ * a lease on the parent folder, so (without reworking the code) modifying
+ * the parent folder update time will fail because of a conflict with the
+ * lease. Since we are going to delete the folder soon anyway so accurate
+ * modified time is not necessary, it's easier to just skip
+ * the modified time update.
+ *
+ * @param f
+ * @param recursive
+ * @param skipParentFolderLastModifidedTimeUpdate If true, don't update the folder last
+ * modified time.
+ * @return true if and only if the file is deleted
+ * @throws IOException
+ */
+ public boolean delete(Path f, boolean recursive,
+ boolean skipParentFolderLastModifidedTimeUpdate) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting file: " + f.toString());
@@ -723,11 +1497,13 @@ public class NativeAzureFileSystem extends FileSystem {
store.storeEmptyFolder(parentKey,
createPermissionStatus(FsPermission.getDefault()));
} else {
- store.updateFolderLastModifiedTime(parentKey);
+ if (!skipParentFolderLastModifidedTimeUpdate) {
+ store.updateFolderLastModifiedTime(parentKey, null);
+ }
}
}
- instrumentation.fileDeleted();
store.delete(key);
+ instrumentation.fileDeleted();
} else {
// The path specifies a folder. Recursively delete all entries under the
// folder.
@@ -784,7 +1560,9 @@ public class NativeAzureFileSystem extends FileSystem {
Path parent = absolutePath.getParent();
if (parent != null && parent.getParent() != null) { // not root
String parentKey = pathToKey(parent);
- store.updateFolderLastModifiedTime(parentKey);
+ if (!skipParentFolderLastModifidedTimeUpdate) {
+ store.updateFolderLastModifiedTime(parentKey, null);
+ }
}
instrumentation.directoryDeleted();
}
@@ -818,6 +1596,13 @@ public class NativeAzureFileSystem extends FileSystem {
LOG.debug("Path " + f.toString() + "is a folder.");
}
+ // If a rename operation for the folder was pending, redo it.
+ // Then the file does not exist, so signal that.
+ if (conditionalRedoFolderRename(f)) {
+ throw new FileNotFoundException(
+ absolutePath + ": No such file or directory.");
+ }
+
// Return reference to the directory object.
return newDirectory(meta, absolutePath);
}
@@ -832,9 +1617,38 @@ public class NativeAzureFileSystem extends FileSystem {
}
// File not found. Throw exception no such file or directory.
- // Note: Should never get to this point since the root always exists.
- throw new FileNotFoundException(absolutePath
- + ": No such file or directory.");
+ //
+ throw new FileNotFoundException(
+ absolutePath + ": No such file or directory.");
+ }
+
+ // Return true if there is a rename pending and we redo it, otherwise false.
+ private boolean conditionalRedoFolderRename(Path f) throws IOException {
+
+ // Can't rename /, so return immediately in that case.
+ if (f.getName().equals("")) {
+ return false;
+ }
+
+ // Check if there is a -RenamePending.json file for this folder, and if so,
+ // redo the rename.
+ Path absoluteRenamePendingFile = renamePendingFilePath(f);
+ if (exists(absoluteRenamePendingFile)) {
+ FolderRenamePending pending =
+ new FolderRenamePending(absoluteRenamePendingFile, this);
+ pending.redo();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // Return the path name that would be used for rename of folder with path f.
+ private Path renamePendingFilePath(Path f) {
+ Path absPath = makeAbsolute(f);
+ String key = pathToKey(absPath);
+ key += "-RenamePending.json";
+ return keyToPath(key);
}
@Override
@@ -867,6 +1681,17 @@ public class NativeAzureFileSystem extends FileSystem {
}
String partialKey = null;
PartialListing listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
+
+ // For any -RenamePending.json files in the listing,
+ // push the rename forward.
+ boolean renamed = conditionalRedoFolderRenames(listing);
+
+ // If any renames were redone, get another listing,
+ // since the current one may have changed due to the redo.
+ if (renamed) {
+ listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
+ }
+
for (FileMetadata fileMetadata : listing.getFiles()) {
Path subpath = keyToPath(fileMetadata.getKey());
@@ -903,25 +1728,62 @@ public class NativeAzureFileSystem extends FileSystem {
return status.toArray(new FileStatus[0]);
}
+ // Redo any folder renames needed if there are rename pending files in the
+ // directory listing. Return true if one or more redo operations were done.
+ private boolean conditionalRedoFolderRenames(PartialListing listing)
+ throws IllegalArgumentException, IOException {
+ boolean renamed = false;
+ for (FileMetadata fileMetadata : listing.getFiles()) {
+ Path subpath = keyToPath(fileMetadata.getKey());
+ if (isRenamePendingFile(subpath)) {
+ FolderRenamePending pending =
+ new FolderRenamePending(subpath, this);
+ pending.redo();
+ renamed = true;
+ }
+ }
+ return renamed;
+ }
+
+ // True if this is a folder rename pending file, else false.
+ private boolean isRenamePendingFile(Path path) {
+ return path.toString().endsWith(FolderRenamePending.SUFFIX);
+ }
+
private FileStatus newFile(FileMetadata meta, Path path) {
- return new FileStatus(meta.getLength(), false, 1, blockSize,
- meta.getLastModified(), 0, meta.getPermissionStatus().getPermission(),
- meta.getPermissionStatus().getUserName(), meta.getPermissionStatus()
- .getGroupName(),
+ return new FileStatus (
+ meta.getLength(),
+ false,
+ 1,
+ blockSize,
+ meta.getLastModified(),
+ 0,
+ meta.getPermissionStatus().getPermission(),
+ meta.getPermissionStatus().getUserName(),
+ meta.getPermissionStatus().getGroupName(),
path.makeQualified(getUri(), getWorkingDirectory()));
}
private FileStatus newDirectory(FileMetadata meta, Path path) {
- return new FileStatus(0, true, 1, blockSize, meta == null ? 0
- : meta.getLastModified(), 0, meta == null ? FsPermission.getDefault()
- : meta.getPermissionStatus().getPermission(), meta == null ? "" : meta
- .getPermissionStatus().getUserName(), meta == null ? "" : meta
- .getPermissionStatus().getGroupName(), path.makeQualified(getUri(),
- getWorkingDirectory()));
+ return new FileStatus (
+ 0,
+ true,
+ 1,
+ blockSize,
+ meta == null ? 0 : meta.getLastModified(),
+ 0,
+ meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(),
+ meta == null ? "" : meta.getPermissionStatus().getUserName(),
+ meta == null ? "" : meta.getPermissionStatus().getGroupName(),
+ path.makeQualified(getUri(), getWorkingDirectory()));
}
private static enum UMaskApplyMode {
- NewFile, NewDirectory, ChangeExistingFile, ChangeExistingDirectory,
+ NewFile,
+ NewDirectory,
+ NewDirectoryNoUmask,
+ ChangeExistingFile,
+ ChangeExistingDirectory,
}
/**
@@ -958,13 +1820,19 @@ public class NativeAzureFileSystem extends FileSystem {
private PermissionStatus createPermissionStatus(FsPermission permission)
throws IOException {
// Create the permission status for this file based on current user
- return new PermissionStatus(UserGroupInformation.getCurrentUser()
- .getShortUserName(), getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME,
- AZURE_DEFAULT_GROUP_DEFAULT), permission);
+ return new PermissionStatus(
+ UserGroupInformation.getCurrentUser().getShortUserName(),
+ getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME,
+ AZURE_DEFAULT_GROUP_DEFAULT),
+ permission);
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ return mkdirs(f, permission, false);
+ }
+
+ public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating directory: " + f.toString());
}
@@ -975,24 +1843,31 @@ public class NativeAzureFileSystem extends FileSystem {
}
Path absolutePath = makeAbsolute(f);
- PermissionStatus permissionStatus = createPermissionStatus(applyUMask(
- permission, UMaskApplyMode.NewDirectory));
+ PermissionStatus permissionStatus = null;
+ if(noUmask) {
+ // ensure owner still has wx permissions at the minimum
+ permissionStatus = createPermissionStatus(
+ applyUMask(FsPermission.createImmutable((short) (permission.toShort() | USER_WX_PERMISION)),
+ UMaskApplyMode.NewDirectoryNoUmask));
+ } else {
+ permissionStatus = createPermissionStatus(
+ applyUMask(permission, UMaskApplyMode.NewDirectory));
+ }
+
ArrayList<String> keysToCreateAsFolder = new ArrayList<String>();
ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>();
boolean childCreated = false;
// Check that there is no file in the parent chain of the given path.
- // Stop when you get to the root
- for (Path current = absolutePath, parent = current.getParent(); parent != null; current = parent, parent = current
- .getParent()) {
+ for (Path current = absolutePath, parent = current.getParent();
+ parent != null; // Stop when you get to the root
+ current = parent, parent = current.getParent()) {
String currentKey = pathToKey(current);
FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
if (currentMetadata != null && !currentMetadata.isDir()) {
- throw new IOException("Cannot create directory " + f + " because "
- + current + " is an existing file.");
- } else if (currentMetadata == null
- || (currentMetadata.isDir() && currentMetadata
- .getBlobMaterialization() == BlobMaterialization.Implicit)) {
+ throw new IOException("Cannot create directory " + f + " because " +
+ current + " is an existing file.");
+ } else if (currentMetadata == null) {
keysToCreateAsFolder.add(currentKey);
childCreated = true;
} else {
@@ -1009,18 +1884,8 @@ public class NativeAzureFileSystem extends FileSystem {
store.storeEmptyFolder(currentKey, permissionStatus);
}
- // Take the time after finishing mkdirs as the modified time, and update all
- // the existing directories' modified time to it uniformly.
- final Calendar lastModifiedCalendar = Calendar
- .getInstance(Utility.LOCALE_US);
- lastModifiedCalendar.setTimeZone(Utility.UTC_ZONE);
- Date lastModified = lastModifiedCalendar.getTime();
- for (String key : keysToUpdateAsFolder) {
- store.updateFolderLastModifiedTime(key, lastModified);
- }
-
instrumentation.directoryCreated();
-
+
// otherwise throws exception
return true;
}
@@ -1043,12 +1908,14 @@ public class NativeAzureFileSystem extends FileSystem {
}
return new FSDataInputStream(new BufferedFSInputStream(
- new NativeAzureFsInputStream(store.retrieve(key), key), bufferSize));
+ new NativeAzureFsInputStream(store.retrieve(key), key, meta.getLength()), bufferSize));
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
+ FolderRenamePending renamePending = null;
+
if (LOG.isDebugEnabled()) {
LOG.debug("Moving " + src + " to " + dst);
}
@@ -1065,91 +1932,28 @@ public class NativeAzureFileSystem extends FileSystem {
return false;
}
- FileMetadata srcMetadata = store.retrieveMetadata(srcKey);
- if (srcMetadata == null) {
- // Source doesn't exist
- if (LOG.isDebugEnabled()) {
- LOG.debug("Source " + src + " doesn't exist, failing the rename.");
- }
- return false;
- }
-
// Figure out the final destination
Path absoluteDst = makeAbsolute(dst);
String dstKey = pathToKey(absoluteDst);
FileMetadata dstMetadata = store.retrieveMetadata(dstKey);
-
- // directory rename validations
- if (srcMetadata.isDir()) {
-
- // rename dir to self is an error
- if (srcKey.equals(dstKey)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming directory to itself is disallowed. path=" + src);
- }
- return false;
- }
-
- // rename dir to (sub-)child of self is an error. see
- // FileSystemContractBaseTest.testRenameChildDirForbidden
- if (dstKey.startsWith(srcKey + PATH_DELIMITER)) {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming directory to itself is disallowed. src=" + src
- + " dest=" + dst);
- }
- return false;
- }
- }
-
- // file rename early checks
- if (!srcMetadata.isDir()) {
- if (srcKey.equals(dstKey)) {
- // rename file to self is OK
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming file to itself. This is allowed and is treated as no-op. path="
- + src);
- }
- return true;
- }
- }
-
- // More validations..
- // If target is dir but target already exists, alter the dst to be a
- // subfolder.
- // eg move("/a/file.txt", "/b") where "/b" already exists causes the target
- // to be "/c/file.txt
if (dstMetadata != null && dstMetadata.isDir()) {
+ // It's an existing directory.
dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
- // Best would be to update dstMetadata, but it is not used further, so set
- // it to null and skip the additional cost
- dstMetadata = null;
- // dstMetadata = store.retrieveMetadata(dstKey);
if (LOG.isDebugEnabled()) {
LOG.debug("Destination " + dst
+ " is a directory, adjusted the destination to be " + dstKey);
}
-
- // rename dir to self is an error
- if (srcKey.equals(dstKey)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming directory to itself is disallowed. path=" + src);
- }
- return false;
- }
-
} else if (dstMetadata != null) {
- // Otherwise, attempting to overwrite a file is error
+ // Attempting to overwrite a file using rename()
if (LOG.isDebugEnabled()) {
LOG.debug("Destination " + dst
+ " is an already existing file, failing the rename.");
}
return false;
} else {
- // Either dir or file and target doesn't exist.. Check that the parent
- // directory exists.
- FileMetadata parentOfDestMetadata = store
- .retrieveMetadata(pathToKey(absoluteDst.getParent()));
+ // Check that the parent directory exists.
+ FileMetadata parentOfDestMetadata =
+ store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
if (parentOfDestMetadata == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Parent of the destination " + dst
@@ -1164,88 +1968,136 @@ public class NativeAzureFileSystem extends FileSystem {
return false;
}
}
-
- // Validations complete, do the move.
- if (!srcMetadata.isDir()) {
+ FileMetadata srcMetadata = store.retrieveMetadata(srcKey);
+ if (srcMetadata == null) {
+ // Source doesn't exist
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Source " + src + " doesn't exist, failing the rename.");
+ }
+ return false;
+ } else if (!srcMetadata.isDir()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Source " + src + " found as a file, renaming.");
}
store.rename(srcKey, dstKey);
} else {
- // Move everything inside the folder.
- String priorLastKey = null;
- // Calculate the index of the part of the string to be moved. That
- // is everything on the path up to the folder name.
- do {
- // List all blobs rooted at the source folder.
- PartialListing listing = store.listAll(srcKey, AZURE_LIST_ALL,
- AZURE_UNBOUNDED_DEPTH, priorLastKey);
-
- // Rename all the files in the folder.
- for (FileMetadata file : listing.getFiles()) {
- // Rename all materialized entries under the folder to point to the
- // final destination.
- if (file.getBlobMaterialization() == BlobMaterialization.Explicit) {
- String srcName = file.getKey();
- String suffix = srcName.substring(srcKey.length());
- String dstName = dstKey + suffix;
- store.rename(srcName, dstName);
- }
- }
- priorLastKey = listing.getPriorLastKey();
- } while (priorLastKey != null);
- // Rename the top level empty blob for the folder.
- if (srcMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
- store.rename(srcKey, dstKey);
+ // Prepare for, execute and clean up after of all files in folder, and
+ // the root file, and update the last modified time of the source and
+ // target parent folders. The operation can be redone if it fails part
+ // way through, by applying the "Rename Pending" file.
+
+ // The following code (internally) only does atomic rename preparation
+ // and lease management for page blob folders, limiting the scope of the
+ // operation to HBase log file folders, where atomic rename is required.
+ // In the future, we could generalize it easily to all folders.
+ renamePending = prepareAtomicFolderRename(srcKey, dstKey);
+ renamePending.execute();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renamed " + src + " to " + dst + " successfully.");
}
+ renamePending.cleanup();
+ return true;
}
- // Update both source and destination parent folder last modified time.
- Path srcParent = makeAbsolute(keyToPath(srcKey)).getParent();
- if (srcParent != null && srcParent.getParent() != null) { // not root
- String srcParentKey = pathToKey(srcParent);
+ // Update the last-modified time of the parent folders of both source
+ // and destination.
+ updateParentFolderLastModifiedTime(srcKey);
+ updateParentFolderLastModifiedTime(dstKey);
- // ensure the srcParent is a materialized folder
- FileMetadata srcParentMetadata = store.retrieveMetadata(srcParentKey);
- if (srcParentMetadata.isDir()
- && srcParentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
- store.storeEmptyFolder(srcParentKey,
- createPermissionStatus(FsPermission.getDefault()));
- }
-
- store.updateFolderLastModifiedTime(srcParentKey);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renamed " + src + " to " + dst + " successfully.");
}
+ return true;
+ }
- Path destParent = makeAbsolute(keyToPath(dstKey)).getParent();
- if (destParent != null && destParent.getParent() != null) { // not root
- String dstParentKey = pathToKey(destParent);
+ /**
+ * Update the last-modified time of the parent folder of the file
+ * identified by key.
+ * @param key
+ * @throws IOException
+ */
+ private void updateParentFolderLastModifiedTime(String key)
+ throws IOException {
+ Path parent = makeAbsolute(keyToPath(key)).getParent();
+ if (parent != null && parent.getParent() != null) { // not root
+ String parentKey = pathToKey(parent);
- // ensure the dstParent is a materialized folder
- FileMetadata dstParentMetadata = store.retrieveMetadata(dstParentKey);
- if (dstParentMetadata.isDir()
- && dstParentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
- store.storeEmptyFolder(dstParentKey,
- createPermissionStatus(FsPermission.getDefault()));
- }
+ // ensure the parent is a materialized folder
+ FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
+ // The metadata could be null if the implicit folder only contains a
+ // single file. In this case, the parent folder no longer exists if the
+ // file is renamed; so we can safely ignore the null pointer case.
+ if (parentMetadata != null) {
+ if (parentMetadata.isDir()
+ && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
+ store.storeEmptyFolder(parentKey,
+ createPermissionStatus(FsPermission.getDefault()));
+ }
- store.updateFolderLastModifiedTime(dstParentKey);
+ store.updateFolderLastModifiedTime(parentKey, null);
+ }
}
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renamed " + src + " to " + dst + " successfully.");
+ /**
+ * If the source is a page blob folder,
+ * prepare to rename this folder atomically. This means to get exclusive
+ * access to the source folder, and record the actions to be performed for
+ * this rename in a "Rename Pending" file. This code was designed to
+ * meet the needs of HBase, which requires atomic rename of write-ahead log
+ * (WAL) folders for correctness.
+ *
+ * Before calling this method, the caller must ensure that the source is a
+ * folder.
+ *
+ * For non-page-blob directories, prepare the in-memory information needed,
+ * but don't take the lease or write the redo file. This is done to limit the
+ * scope of atomic folder rename to HBase, at least at the time of writing
+ * this code.
+ *
+ * @param srcKey Source folder name.
+ * @param dstKey Destination folder name.
+ * @throws IOException
+ */
+ private FolderRenamePending prepareAtomicFolderRename(
+ String srcKey, String dstKey) throws IOException {
+
+ if (store.isAtomicRenameKey(srcKey)) {
+
+ // Block unwanted concurrent access to source folder.
+ SelfRenewingLease lease = leaseSourceFolder(srcKey);
+
+ // Prepare in-memory information needed to do or redo a folder rename.
+ FolderRenamePending renamePending =
+ new FolderRenamePending(srcKey, dstKey, lease, this);
+
+ // Save it to persistent storage to help recover if the operation fails.
+ renamePending.writeFile(this);
+ return renamePending;
+ } else {
+ FolderRenamePending renamePending =
+ new FolderRenamePending(srcKey, dstKey, null, this);
+ return renamePending;
}
- return true;
}
/**
- * Return an array containing hostnames, offset and size of portions of the
- * given file. For WASB we'll just lie and give fake hosts to make sure we get
- * many splits in MR jobs.
+ * Get a self-renewing Azure blob lease on the source folder zero-byte file.
+ */
+ private SelfRenewingLease leaseSourceFolder(String srcKey)
+ throws AzureException {
+ return store.acquireLease(srcKey);
+ }
+
+ /**
+ * Return an array containing hostnames, offset and size of
+ * portions of the given file. For WASB we'll just lie and give
+ * fake hosts to make sure we get many splits in MR jobs.
*/
@Override
- public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
- long len) throws IOException {
+ public BlockLocation[] getFileBlockLocations(FileStatus file,
+ long start, long len) throws IOException {
if (file == null) {
return null;
}
@@ -1306,11 +2158,12 @@ public class NativeAzureFileSystem extends FileSystem {
if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
// It's an implicit folder, need to materialize it.
store.storeEmptyFolder(key, createPermissionStatus(permission));
- } else if (!metadata.getPermissionStatus().getPermission()
- .equals(permission)) {
- store.changePermissionStatus(key, new PermissionStatus(metadata
- .getPermissionStatus().getUserName(), metadata.getPermissionStatus()
- .getGroupName(), permission));
+ } else if (!metadata.getPermissionStatus().getPermission().
+ equals(permission)) {
+ store.changePermissionStatus(key, new PermissionStatus(
+ metadata.getPermissionStatus().getUserName(),
+ metadata.getPermissionStatus().getGroupName(),
+ permission));
}
}
@@ -1324,10 +2177,11 @@ public class NativeAzureFileSystem extends FileSystem {
throw new FileNotFoundException("File doesn't exist: " + p);
}
PermissionStatus newPermissionStatus = new PermissionStatus(
- username == null ? metadata.getPermissionStatus().getUserName()
- : username, groupname == null ? metadata.getPermissionStatus()
- .getGroupName() : groupname, metadata.getPermissionStatus()
- .getPermission());
+ username == null ?
+ metadata.getPermissionStatus().getUserName() : username,
+ groupname == null ?
+ metadata.getPermissionStatus().getGroupName() : groupname,
+ metadata.getPermissionStatus().getPermission());
if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
// It's an implicit folder, need to materialize it.
store.storeEmptyFolder(key, newPermissionStatus);
@@ -1341,12 +2195,12 @@ public class NativeAzureFileSystem extends FileSystem {
if (isClosed) {
return;
}
-
+
// Call the base close() to close any resources there.
super.close();
- // Close the store
+ // Close the store to close any resources there - e.g. the bandwidth
+ // updater thread would be stopped at this time.
store.close();
-
// Notify the metrics system that this file system is closed, which may
// trigger one final metrics push to get the accurate final file system
// metrics out.
@@ -1364,16 +2218,17 @@ public class NativeAzureFileSystem extends FileSystem {
}
/**
- * A handler that defines what to do with blobs whose upload was interrupted.
+ * A handler that defines what to do with blobs whose upload was
+ * interrupted.
*/
private abstract class DanglingFileHandler {
abstract void handleFile(FileMetadata file, FileMetadata tempFile)
- throws IOException;
+ throws IOException;
}
/**
- * Handler implementation for just deleting dangling files and cleaning them
- * up.
+ * Handler implementation for just deleting dangling files and cleaning
+ * them up.
*/
private class DanglingFileDeleter extends DanglingFileHandler {
@Override
@@ -1388,8 +2243,8 @@ public class NativeAzureFileSystem extends FileSystem {
}
/**
- * Handler implementation for just moving dangling files to recovery location
- * (/lost+found).
+ * Handler implementation for just moving dangling files to recovery
+ * location (/lost+found).
*/
private class DanglingFileRecoverer extends DanglingFileHandler {
private final Path destination;
@@ -1405,8 +2260,8 @@ public class NativeAzureFileSystem extends FileSystem {
LOG.debug("Recovering " + file.getKey());
}
// Move to the final destination
- String finalDestinationKey = pathToKey(new Path(destination,
- file.getKey()));
+ String finalDestinationKey =
+ pathToKey(new Path(destination, file.getKey()));
store.rename(tempFile.getKey(), finalDestinationKey);
if (!finalDestinationKey.equals(file.getKey())) {
// Delete the empty link file now that we've restored it.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 4e1d0b6..0229cb7 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -53,6 +53,10 @@ interface NativeFileSystemStore {
DataOutputStream storefile(String key, PermissionStatus permissionStatus)
throws AzureException;
+ boolean isPageBlobKey(String key);
+
+ boolean isAtomicRenameKey(String key);
+
void storeEmptyLinkFile(String key, String tempBlobKey,
PermissionStatus permissionStatus) throws AzureException;
@@ -74,9 +78,12 @@ interface NativeFileSystemStore {
void rename(String srcKey, String dstKey) throws IOException;
+ void rename(String srcKey, String dstKey, boolean acquireLease, SelfRenewingLease existingLease)
+ throws IOException;
+
/**
* Delete all keys with the given prefix. Used for testing.
- *
+ *
* @throws IOException
*/
@VisibleForTesting
@@ -84,15 +91,20 @@ interface NativeFileSystemStore {
/**
* Diagnostic method to dump state to the console.
- *
+ *
* @throws IOException
*/
void dump() throws IOException;
void close();
- void updateFolderLastModifiedTime(String key) throws AzureException;
-
- void updateFolderLastModifiedTime(String key, Date lastModified)
+ void updateFolderLastModifiedTime(String key, SelfRenewingLease folderLease)
throws AzureException;
+
+ void updateFolderLastModifiedTime(String key, Date lastModified,
+ SelfRenewingLease folderLease) throws AzureException;
+
+ void delete(String key, SelfRenewingLease lease) throws IOException;
+
+ SelfRenewingLease acquireLease(String key) throws AzureException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobFormatHelpers.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobFormatHelpers.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobFormatHelpers.java
new file mode 100644
index 0000000..ad11aac
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobFormatHelpers.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.nio.ByteBuffer;
+
+import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+
+/**
+ * Constants and helper methods for ASV's custom data format in page blobs.
+ */
+final class PageBlobFormatHelpers {
+ public static final short PAGE_SIZE = 512;
+ public static final short PAGE_HEADER_SIZE = 2;
+ public static final short PAGE_DATA_SIZE = PAGE_SIZE - PAGE_HEADER_SIZE;
+
+ // Hide constructor for utility class.
+ private PageBlobFormatHelpers() {
+
+ }
+
+ /**
+ * Stores the given short as a two-byte array.
+ */
+ public static byte[] fromShort(short s) {
+ return ByteBuffer.allocate(2).putShort(s).array();
+ }
+
+ /**
+ * Retrieves a short from the given two bytes.
+ */
+ public static short toShort(byte firstByte, byte secondByte) {
+ return ByteBuffer.wrap(new byte[] { firstByte, secondByte })
+ .getShort();
+ }
+
+ public static BlobRequestOptions withMD5Checking() {
+ BlobRequestOptions options = new BlobRequestOptions();
+ options.setUseTransactionalContentMD5(true);
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
new file mode 100644
index 0000000..62b47ee
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
@@ -0,0 +1,455 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_DATA_SIZE;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_HEADER_SIZE;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_SIZE;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.toShort;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.withMD5Checking;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
+
+import com.microsoft.windowsazure.storage.OperationContext;
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+import com.microsoft.windowsazure.storage.blob.PageRange;
+
+/**
+ * An input stream that reads file data from a page blob stored
+ * using ASV's custom format.
+ */
+
+final class PageBlobInputStream extends InputStream {
+ private static final Log LOG = LogFactory.getLog(PageBlobInputStream.class);
+
+ // The blob we're reading from.
+ private final CloudPageBlobWrapper blob;
+ // The operation context to use for storage requests.
+ private final OperationContext opContext;
+ // The number of pages remaining to be read from the server.
+ private long numberOfPagesRemaining;
+ // The current byte offset to start reading from the server next,
+ // equivalent to (total number of pages we've read) * (page size).
+ private long currentOffsetInBlob;
+ // The buffer holding the current data we last read from the server.
+ private byte[] currentBuffer;
+ // The current byte offset we're at in the buffer.
+ private int currentOffsetInBuffer;
+ // Maximum number of pages to get per any one request.
+ private static final int MAX_PAGES_PER_DOWNLOAD =
+ 4 * 1024 * 1024 / PAGE_SIZE;
+ // Whether the stream has been closed.
+ private boolean closed = false;
+ // Total stream size, or -1 if not initialized.
+ long pageBlobSize = -1;
+ // Current position in stream of valid data.
+ long filePosition = 0;
+
+ /**
+ * Helper method to extract the actual data size of a page blob.
+ * This typically involves 2 service requests (one for page ranges, another
+ * for the last page's data).
+ *
+ * @param blob The blob to get the size from.
+ * @param opContext The operation context to use for the requests.
+ * @return The total data size of the blob in bytes.
+ * @throws IOException If the format is corrupt.
+ * @throws StorageException If anything goes wrong in the requests.
+ */
+ public static long getPageBlobSize(CloudPageBlobWrapper blob,
+ OperationContext opContext) throws IOException, StorageException {
+ // Get the page ranges for the blob. There should be one range starting
+ // at byte 0, but we tolerate (and ignore) ranges after the first one.
+ ArrayList<PageRange> pageRanges =
+ blob.downloadPageRanges(new BlobRequestOptions(), opContext);
+ if (pageRanges.size() == 0) {
+ return 0;
+ }
+ if (pageRanges.get(0).getStartOffset() != 0) {
+ // Not expected: we always upload our page blobs as a contiguous range
+ // starting at byte 0.
+ throw badStartRangeException(blob, pageRanges.get(0));
+ }
+ long totalRawBlobSize = pageRanges.get(0).getEndOffset() + 1;
+
+ // Get the last page.
+ long lastPageStart = totalRawBlobSize - PAGE_SIZE;
+ ByteArrayOutputStream baos =
+ new ByteArrayOutputStream(PageBlobFormatHelpers.PAGE_SIZE);
+ blob.downloadRange(lastPageStart, PAGE_SIZE, baos,
+ new BlobRequestOptions(), opContext);
+
+ byte[] lastPage = baos.toByteArray();
+ short lastPageSize = getPageSize(blob, lastPage, 0);
+ long totalNumberOfPages = totalRawBlobSize / PAGE_SIZE;
+ return (totalNumberOfPages - 1) * PAGE_DATA_SIZE + lastPageSize;
+ }
+
+ /**
+ * Constructs a stream over the given page blob.
+ */
+ public PageBlobInputStream(CloudPageBlobWrapper blob,
+ OperationContext opContext)
+ throws IOException {
+ this.blob = blob;
+ this.opContext = opContext;
+ ArrayList<PageRange> allRanges;
+ try {
+ allRanges =
+ blob.downloadPageRanges(new BlobRequestOptions(), opContext);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ if (allRanges.size() > 0) {
+ if (allRanges.get(0).getStartOffset() != 0) {
+ throw badStartRangeException(blob, allRanges.get(0));
+ }
+ if (allRanges.size() > 1) {
+ LOG.warn(String.format(
+ "Blob %s has %d page ranges beyond the first range. "
+ + "Only reading the first range.",
+ blob.getUri(), allRanges.size() - 1));
+ }
+ numberOfPagesRemaining =
+ (allRanges.get(0).getEndOffset() + 1) / PAGE_SIZE;
+ } else {
+ numberOfPagesRemaining = 0;
+ }
+ }
+
+ /** Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ *
+ * This is to match the behavior of DFSInputStream.available(),
+ * which some clients may rely on (HBase write-ahead log reading in
+ * particular).
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ if (pageBlobSize == -1) {
+ try {
+ pageBlobSize = getPageBlobSize(blob, opContext);
+ } catch (StorageException e) {
+ throw new IOException("Unable to get page blob size.", e);
+ }
+ }
+
+ final long remaining = pageBlobSize - filePosition;
+ return remaining <= Integer.MAX_VALUE ?
+ (int) remaining : Integer.MAX_VALUE;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ closed = true;
+ }
+
+ private boolean dataAvailableInBuffer() {
+ return currentBuffer != null
+ && currentOffsetInBuffer < currentBuffer.length;
+ }
+
+ /**
+ * Check our buffer and download more from the server if needed.
+ * @return true if there's more data in the buffer, false if we're done.
+ * @throws IOException
+ */
+ private synchronized boolean ensureDataInBuffer() throws IOException {
+ if (dataAvailableInBuffer()) {
+ // We still have some data in our buffer.
+ return true;
+ }
+ currentBuffer = null;
+ if (numberOfPagesRemaining == 0) {
+ // No more data to read.
+ return false;
+ }
+ final long pagesToRead = Math.min(MAX_PAGES_PER_DOWNLOAD,
+ numberOfPagesRemaining);
+ final int bufferSize = (int) (pagesToRead * PAGE_SIZE);
+
+ // Download page to current buffer.
+ try {
+ // Create a byte array output stream to capture the results of the
+ // download.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
+ blob.downloadRange(currentOffsetInBlob, bufferSize, baos,
+ withMD5Checking(), opContext);
+ currentBuffer = baos.toByteArray();
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ numberOfPagesRemaining -= pagesToRead;
+ currentOffsetInBlob += bufferSize;
+ currentOffsetInBuffer = PAGE_HEADER_SIZE;
+
+ // Since we just downloaded a new buffer, validate its consistency.
+ validateCurrentBufferConsistency();
+
+ return true;
+ }
+
+ private void validateCurrentBufferConsistency()
+ throws IOException {
+ if (currentBuffer.length % PAGE_SIZE != 0) {
+ throw new AssertionError("Unexpected buffer size: "
+ + currentBuffer.length);
+ }
+ int numberOfPages = currentBuffer.length / PAGE_SIZE;
+ for (int page = 0; page < numberOfPages; page++) {
+ short currentPageSize = getPageSize(blob, currentBuffer,
+ page * PAGE_SIZE);
+ // Calculate the number of pages that exist after this one
+ // in the blob.
+ long totalPagesAfterCurrent =
+ (numberOfPages - page - 1) + numberOfPagesRemaining;
+ // Only the last page is allowed to be not filled completely.
+ if (currentPageSize < PAGE_DATA_SIZE
+ && totalPagesAfterCurrent > 0) {
+ throw fileCorruptException(blob, String.format(
+ "Page with partial data found in the middle (%d pages from the"
+ + " end) that only has %d bytes of data.",
+ totalPagesAfterCurrent, currentPageSize));
+ }
+ }
+ }
+
+ // Reads the page size from the page header at the given offset.
+ private static short getPageSize(CloudPageBlobWrapper blob,
+ byte[] data, int offset) throws IOException {
+ short pageSize = toShort(data[offset], data[offset + 1]);
+ if (pageSize < 0 || pageSize > PAGE_DATA_SIZE) {
+ throw fileCorruptException(blob, String.format(
+ "Unexpected page size in the header: %d.",
+ pageSize));
+ }
+ return pageSize;
+ }
+
+ @Override
+ public synchronized int read(byte[] outputBuffer, int offset, int len)
+ throws IOException {
+ int numberOfBytesRead = 0;
+ while (len > 0) {
+ if (!ensureDataInBuffer()) {
+ filePosition += numberOfBytesRead;
+ return numberOfBytesRead;
+ }
+ int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage();
+ int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage);
+ System.arraycopy(currentBuffer, currentOffsetInBuffer, outputBuffer,
+ offset, numBytesToRead);
+ numberOfBytesRead += numBytesToRead;
+ offset += numBytesToRead;
+ len -= numBytesToRead;
+ if (numBytesToRead == bytesRemainingInCurrentPage) {
+ // We've finished this page, move on to the next.
+ advancePagesInBuffer(1);
+ } else {
+ currentOffsetInBuffer += numBytesToRead;
+ }
+ }
+ filePosition += numberOfBytesRead;
+ return numberOfBytesRead;
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] oneByte = new byte[1];
+ if (read(oneByte) == 0) {
+ return -1;
+ }
+ return oneByte[0];
+ }
+
+ /**
+ * Skips over and discards n bytes of data from this input stream.
+ * @param n the number of bytes to be skipped.
+ * @return the actual number of bytes skipped.
+ */
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ long skipped = skipImpl(n);
+ filePosition += skipped; // track the position in the stream
+ return skipped;
+ }
+
+ private long skipImpl(long n) throws IOException {
+
+ if (n == 0) {
+ return 0;
+ }
+
+ // First skip within the current buffer as much as possible.
+ long skippedWithinBuffer = skipWithinBuffer(n);
+ if (skippedWithinBuffer > n) {
+ // TO CONSIDER: Using a contracts framework such as Google's cofoja for
+ // these post-conditions.
+ throw new AssertionError(String.format(
+ "Bug in skipWithinBuffer: it skipped over %d bytes when asked to "
+ + "skip %d bytes.", skippedWithinBuffer, n));
+ }
+ n -= skippedWithinBuffer;
+ long skipped = skippedWithinBuffer;
+
+ // Empty the current buffer, we're going beyond it.
+ currentBuffer = null;
+
+ // Skip over whole pages as necessary without retrieving them from the
+ // server.
+ long pagesToSkipOver = Math.min(
+ n / PAGE_DATA_SIZE,
+ numberOfPagesRemaining - 1);
+ numberOfPagesRemaining -= pagesToSkipOver;
+ currentOffsetInBlob += pagesToSkipOver * PAGE_SIZE;
+ skipped += pagesToSkipOver * PAGE_DATA_SIZE;
+ n -= pagesToSkipOver * PAGE_DATA_SIZE;
+ if (n == 0) {
+ return skipped;
+ }
+
+ // Now read in at the current position, and skip within current buffer.
+ if (!ensureDataInBuffer()) {
+ return skipped;
+ }
+ return skipped + skipWithinBuffer(n);
+ }
+
+ /**
+ * Skip over n bytes within the current buffer or just over skip the whole
+ * buffer if n is greater than the bytes remaining in the buffer.
+ * @param n The number of data bytes to skip.
+ * @return The number of bytes actually skipped.
+ * @throws IOException if data corruption found in the buffer.
+ */
+ private long skipWithinBuffer(long n) throws IOException {
+ if (!dataAvailableInBuffer()) {
+ return 0;
+ }
+ long skipped = 0;
+ // First skip within the current page.
+ skipped = skipWithinCurrentPage(n);
+ if (skipped > n) {
+ throw new AssertionError(String.format(
+ "Bug in skipWithinCurrentPage: it skipped over %d bytes when asked"
+ + " to skip %d bytes.", skipped, n));
+ }
+ n -= skipped;
+ if (n == 0 || !dataAvailableInBuffer()) {
+ return skipped;
+ }
+
+ // Calculate how many whole pages (pages before the possibly partially
+ // filled last page) remain.
+ int currentPageIndex = currentOffsetInBuffer / PAGE_SIZE;
+ int numberOfPagesInBuffer = currentBuffer.length / PAGE_SIZE;
+ int wholePagesRemaining = numberOfPagesInBuffer - currentPageIndex - 1;
+
+ if (n < (PAGE_DATA_SIZE * wholePagesRemaining)) {
+ // I'm within one of the whole pages remaining, skip in there.
+ advancePagesInBuffer((int) (n / PAGE_DATA_SIZE));
+ currentOffsetInBuffer += n % PAGE_DATA_SIZE;
+ return n + skipped;
+ }
+
+ // Skip over the whole pages.
+ advancePagesInBuffer(wholePagesRemaining);
+ skipped += wholePagesRemaining * PAGE_DATA_SIZE;
+ n -= wholePagesRemaining * PAGE_DATA_SIZE;
+
+ // At this point we know we need to skip to somewhere in the last page,
+ // or just go to the end.
+ return skipWithinCurrentPage(n) + skipped;
+ }
+
+ /**
+ * Skip over n bytes within the current page or just over skip the whole
+ * page if n is greater than the bytes remaining in the page.
+ * @param n The number of data bytes to skip.
+ * @return The number of bytes actually skipped.
+ * @throws IOException if data corruption found in the buffer.
+ */
+ private long skipWithinCurrentPage(long n) throws IOException {
+ int remainingBytesInCurrentPage = getBytesRemainingInCurrentPage();
+ if (n < remainingBytesInCurrentPage) {
+ currentOffsetInBuffer += n;
+ return n;
+ } else {
+ advancePagesInBuffer(1);
+ return remainingBytesInCurrentPage;
+ }
+ }
+
+ /**
+ * Gets the number of bytes remaining within the current page in the buffer.
+ * @return The number of bytes remaining.
+ * @throws IOException if data corruption found in the buffer.
+ */
+ private int getBytesRemainingInCurrentPage() throws IOException {
+ if (!dataAvailableInBuffer()) {
+ return 0;
+ }
+ // Calculate our current position relative to the start of the current
+ // page.
+ int currentDataOffsetInPage =
+ (currentOffsetInBuffer % PAGE_SIZE) - PAGE_HEADER_SIZE;
+ int pageBoundary = getCurrentPageStartInBuffer();
+ // Get the data size of the current page from the header.
+ short sizeOfCurrentPage = getPageSize(blob, currentBuffer, pageBoundary);
+ return sizeOfCurrentPage - currentDataOffsetInPage;
+ }
+
+ private static IOException badStartRangeException(CloudPageBlobWrapper blob,
+ PageRange startRange) {
+ return fileCorruptException(blob, String.format(
+ "Page blobs for ASV should always use a page range starting at byte 0. "
+ + "This starts at byte %d.",
+ startRange.getStartOffset()));
+ }
+
+ private void advancePagesInBuffer(int numberOfPages) {
+ currentOffsetInBuffer =
+ getCurrentPageStartInBuffer()
+ + (numberOfPages * PAGE_SIZE)
+ + PAGE_HEADER_SIZE;
+ }
+
+ private int getCurrentPageStartInBuffer() {
+ return PAGE_SIZE * (currentOffsetInBuffer / PAGE_SIZE);
+ }
+
+ private static IOException fileCorruptException(CloudPageBlobWrapper blob,
+ String reason) {
+ return new IOException(String.format(
+ "The page blob: '%s' is corrupt or has an unexpected format: %s.",
+ blob.getUri(), reason));
+ }
+}