You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2015/06/09 07:49:11 UTC
[2/2] hadoop git commit: HADOOP-12073. Azure FileSystem
PageBlobInputStream does not return -1 on EOF. Contributed by Ivan Mitic.
HADOOP-12073. Azure FileSystem PageBlobInputStream does not return -1 on EOF. Contributed by Ivan Mitic.
(cherry picked from commit c45784bc9031353b938f4756473937cca759b3dc)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f5b0cce7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f5b0cce7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f5b0cce7
Branch: refs/heads/branch-2
Commit: f5b0cce7faaee32fbb0f8f2cec233ff178f208ea
Parents: 116a720
Author: cnauroth <cn...@apache.org>
Authored: Mon Jun 8 22:42:14 2015 -0700
Committer: cnauroth <cn...@apache.org>
Committed: Mon Jun 8 22:42:24 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../fs/azure/AzureNativeFileSystemStore.java | 2 +-
.../hadoop/fs/azure/PageBlobInputStream.java | 32 +++++--
.../hadoop/fs/azure/PageBlobOutputStream.java | 10 ++-
.../fs/azure/NativeAzureFileSystemBaseTest.java | 79 ++++++++++++++++-
...tiveAzureFileSystemContractPageBlobLive.java | 90 ++++++++++++++++++++
6 files changed, 204 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5b0cce7/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 7bfc5fa..d16262a 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -356,6 +356,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12054. RPC client should not retry for InvalidToken exceptions.
(Varun Saxena via Arpit Agarwal)
+ HADOOP-12073. Azure FileSystem PageBlobInputStream does not return -1 on
+ EOF. (Ivan Mitic via cnauroth)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5b0cce7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 69bda06..7741f17 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -2301,7 +2301,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
throws AzureException {
if (blob instanceof CloudPageBlobWrapper) {
try {
- return PageBlobInputStream.getPageBlobSize((CloudPageBlobWrapper) blob,
+ return PageBlobInputStream.getPageBlobDataSize((CloudPageBlobWrapper) blob,
getInstrumentedContext(
isConcurrentOOBAppendAllowed()));
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5b0cce7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
index 468ac65..097201b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
@@ -80,7 +80,7 @@ final class PageBlobInputStream extends InputStream {
* @throws IOException If the format is corrupt.
* @throws StorageException If anything goes wrong in the requests.
*/
- public static long getPageBlobSize(CloudPageBlobWrapper blob,
+ public static long getPageBlobDataSize(CloudPageBlobWrapper blob,
OperationContext opContext) throws IOException, StorageException {
// Get the page ranges for the blob. There should be one range starting
// at byte 0, but we tolerate (and ignore) ranges after the first one.
@@ -156,7 +156,7 @@ final class PageBlobInputStream extends InputStream {
}
if (pageBlobSize == -1) {
try {
- pageBlobSize = getPageBlobSize(blob, opContext);
+ pageBlobSize = getPageBlobDataSize(blob, opContext);
} catch (StorageException e) {
throw new IOException("Unable to get page blob size.", e);
}
@@ -179,7 +179,13 @@ final class PageBlobInputStream extends InputStream {
/**
* Check our buffer and download more from the server if needed.
- * @return true if there's more data in the buffer, false if we're done.
+ * If data is not available in the buffer, method downloads maximum
+ * page blob download size (4MB) or if there is less then 4MB left,
+ * all remaining pages.
+ * If we are on the last page, method will return true even if
+ * we reached the end of stream.
+ * @return true if there's more data in the buffer, false if buffer is empty
+ * and we reached the end of the blob.
* @throws IOException
*/
private synchronized boolean ensureDataInBuffer() throws IOException {
@@ -257,11 +263,15 @@ final class PageBlobInputStream extends InputStream {
@Override
public synchronized int read(byte[] outputBuffer, int offset, int len)
throws IOException {
+ // If len is zero return 0 per the InputStream contract
+ if (len == 0) {
+ return 0;
+ }
+
int numberOfBytesRead = 0;
while (len > 0) {
if (!ensureDataInBuffer()) {
- filePosition += numberOfBytesRead;
- return numberOfBytesRead;
+ break;
}
int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage();
int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage);
@@ -277,6 +287,13 @@ final class PageBlobInputStream extends InputStream {
currentOffsetInBuffer += numBytesToRead;
}
}
+
+ // if outputBuffer len is > 0 and zero bytes were read, we reached
+ // an EOF
+ if (numberOfBytesRead == 0) {
+ return -1;
+ }
+
filePosition += numberOfBytesRead;
return numberOfBytesRead;
}
@@ -284,8 +301,9 @@ final class PageBlobInputStream extends InputStream {
@Override
public int read() throws IOException {
byte[] oneByte = new byte[1];
- if (read(oneByte) == 0) {
- return -1;
+ int result = read(oneByte);
+ if (result < 0) {
+ return result;
}
return oneByte[0];
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5b0cce7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
index 2b8846c..8689375 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
@@ -117,6 +117,8 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
// The last task given to the ioThreadPool to execute, to allow
// waiting until it's done.
private WriteRequest lastQueuedTask;
+ // Whether the stream has been closed.
+ private boolean closed = false;
public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class);
@@ -201,7 +203,11 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
* service.
*/
@Override
- public void close() throws IOException {
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
LOG.debug("Closing page blob output stream.");
flush();
checkStreamState();
@@ -221,7 +227,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
Thread.currentThread().interrupt();
}
- this.lastError = new IOException("Stream is already closed.");
+ closed = true;
}
// Log the stacks of all threads.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5b0cce7/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
index 9ce6cc9..6989a70 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
@@ -41,7 +41,6 @@ import java.util.TimeZone;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -54,7 +53,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-
import org.apache.hadoop.fs.azure.AzureException;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
@@ -473,6 +471,83 @@ public abstract class NativeAzureFileSystemBaseTest {
}
@Test
+ public void testInputStreamReadWithZeroSizeBuffer() throws Exception {
+ Path newFile = new Path("zeroSizeRead");
+ OutputStream output = fs.create(newFile);
+ output.write(10);
+ output.close();
+
+ InputStream input = fs.open(newFile);
+ int result = input.read(new byte[2], 0, 0);
+ assertEquals(0, result);
+ }
+
+ @Test
+ public void testInputStreamReadWithBufferReturnsMinusOneOnEof() throws Exception {
+ Path newFile = new Path("eofRead");
+ OutputStream output = fs.create(newFile);
+ output.write(10);
+ output.close();
+
+ // Read first byte back
+ InputStream input = fs.open(newFile);
+ byte[] buff = new byte[1];
+ int result = input.read(buff, 0, 1);
+ assertEquals(1, result);
+ assertEquals(10, buff[0]);
+
+ // Issue another read and make sure it returns -1
+ buff[0] = 2;
+ result = input.read(buff, 0, 1);
+ assertEquals(-1, result);
+ // Buffer is intact
+ assertEquals(2, buff[0]);
+ }
+
+ @Test
+ public void testInputStreamReadWithBufferReturnsMinusOneOnEofForLargeBuffer() throws Exception {
+ Path newFile = new Path("eofRead2");
+ OutputStream output = fs.create(newFile);
+ byte[] outputBuff = new byte[97331];
+ for(int i = 0; i < outputBuff.length; ++i) {
+ outputBuff[i] = (byte)(Math.random() * 255);
+ }
+ output.write(outputBuff);
+ output.close();
+
+ // Read the content of the file
+ InputStream input = fs.open(newFile);
+ byte[] buff = new byte[131072];
+ int result = input.read(buff, 0, buff.length);
+ assertEquals(outputBuff.length, result);
+ for(int i = 0; i < outputBuff.length; ++i) {
+ assertEquals(outputBuff[i], buff[i]);
+ }
+
+ // Issue another read and make sure it returns -1
+ buff = new byte[131072];
+ result = input.read(buff, 0, buff.length);
+ assertEquals(-1, result);
+ }
+
+ @Test
+ public void testInputStreamReadIntReturnsMinusOneOnEof() throws Exception {
+ Path newFile = new Path("eofRead3");
+ OutputStream output = fs.create(newFile);
+ output.write(10);
+ output.close();
+
+ // Read first byte back
+ InputStream input = fs.open(newFile);
+ int value = input.read();
+ assertEquals(10, value);
+
+ // Issue another read and make sure it returns -1
+ value = input.read();
+ assertEquals(-1, value);
+ }
+
+ @Test
public void testSetPermissionOnFile() throws Exception {
Path newFile = new Path("testPermission");
OutputStream output = fs.create(newFile);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5b0cce7/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemContractPageBlobLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemContractPageBlobLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemContractPageBlobLive.java
new file mode 100644
index 0000000..3c3b782
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemContractPageBlobLive.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.junit.Ignore;
+
+public class TestNativeAzureFileSystemContractPageBlobLive extends
+ FileSystemContractBaseTest {
+ private AzureBlobStorageTestAccount testAccount;
+
+ private AzureBlobStorageTestAccount createTestAccount()
+ throws Exception {
+ Configuration conf = new Configuration();
+
+ // Configure the page blob directories key so every file created is a page blob.
+ conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+ // Configure the atomic rename directories key so every folder will have
+ // atomic rename applied.
+ conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+ return AzureBlobStorageTestAccount.create(conf);
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ testAccount = createTestAccount();
+ if (testAccount != null) {
+ fs = testAccount.getFileSystem();
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ if (testAccount != null) {
+ testAccount.cleanup();
+ testAccount = null;
+ fs = null;
+ }
+ }
+
+ @Override
+ protected void runTest() throws Throwable {
+ if (testAccount != null) {
+ super.runTest();
+ }
+ }
+
+ /**
+ * The following tests are failing on Azure and the Azure
+ * file system code needs to be modified to make them pass.
+ * A separate work item has been opened for this.
+ */
+ @Ignore
+ public void testMoveFileUnderParent() throws Throwable {
+ }
+
+ @Ignore
+ public void testRenameFileToSelf() throws Throwable {
+ }
+
+ @Ignore
+ public void testRenameChildDirForbidden() throws Exception {
+ }
+
+ @Ignore
+ public void testMoveDirUnderParent() throws Throwable {
+ }
+
+ @Ignore
+ public void testRenameDirToSelf() throws Throwable {
+ }
+}