You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/18 19:16:57 UTC
[46/50] hadoop git commit: HDFS-8352. Erasure Coding: test webhdfs
read write stripe file. (waltersu4549)
HDFS-8352. Erasure Coding: test webhdfs read write stripe file. (waltersu4549)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/84562b44
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/84562b44
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/84562b44
Branch: refs/heads/HDFS-7285
Commit: 84562b444dd2eebe1fd779a1b537b65b95c9e541
Parents: 1437ab8
Author: waltersu4549 <wa...@apache.org>
Authored: Mon May 18 19:10:37 2015 +0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon May 18 10:02:03 2015 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/TestWriteReadStripedFile.java | 267 ++++++++++---------
1 file changed, 148 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84562b44/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
index 57d6eb9..f78fb7a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
@@ -21,9 +21,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
+import org.apache.hadoop.hdfs.web.WebHdfsConstants;
+import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -33,23 +37,26 @@ import org.junit.Test;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Random;
public class TestWriteReadStripedFile {
private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
-
- private static DistributedFileSystem fs;
private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final static int stripesPerBlock = 4;
static int blockSize = cellSize * stripesPerBlock;
static int numDNs = dataBlocks + parityBlocks + 2;
private static MiniDFSCluster cluster;
+ private static Configuration conf;
+ private static FileSystem fs;
+
+ private static Random r= new Random();
@BeforeClass
public static void setup() throws IOException {
- Configuration conf = new Configuration();
+ conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
@@ -134,7 +141,7 @@ public class TestWriteReadStripedFile {
@Test
public void testFileMoreThanABlockGroup2() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2",
- blockSize * dataBlocks + cellSize+ 123);
+ blockSize * dataBlocks + cellSize + 123);
}
@@ -171,7 +178,7 @@ public class TestWriteReadStripedFile {
}
private void assertSeekAndRead(FSDataInputStream fsdis, int pos,
- int writeBytes) throws IOException {
+ int writeBytes) throws IOException {
fsdis.seek(pos);
byte[] buf = new byte[writeBytes];
int readLen = readAll(fsdis, buf);
@@ -182,147 +189,169 @@ public class TestWriteReadStripedFile {
}
}
- private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
+ private void testOneFileUsingDFSStripedInputStream(String src, int fileLength)
throws IOException {
- Path testPath = new Path(src);
- final byte[] bytes = generateBytes(writeBytes);
- DFSTestUtil.writeFile(fs, testPath, new String(bytes));
- //check file length
- FileStatus status = fs.getFileStatus(testPath);
- long fileLength = status.getLen();
+ final byte[] expected = generateBytes(fileLength);
+ Path srcPath = new Path(src);
+ DFSTestUtil.writeFile(fs, srcPath, new String(expected));
+
+ verifyLength(fs, srcPath, fileLength);
+
+ byte[] smallBuf = new byte[1024];
+ byte[] largeBuf = new byte[fileLength + 100];
+ verifyPread(fs, srcPath, fileLength, expected, largeBuf);
+
+ verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf);
+ verifySeek(fs, srcPath, fileLength);
+ verifyStatefulRead(fs, srcPath, fileLength, expected,
+ ByteBuffer.allocate(fileLength + 100));
+ verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
+ verifyStatefulRead(fs, srcPath, fileLength, expected,
+ ByteBuffer.allocate(1024));
+ }
+
+ @Test
+ public void testWriteReadUsingWebHdfs() throws Exception {
+ int fileLength = blockSize * dataBlocks + cellSize + 123;
+
+ final byte[] expected = generateBytes(fileLength);
+ FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
+ WebHdfsConstants.WEBHDFS_SCHEME);
+ Path srcPath = new Path("/testWriteReadUsingWebHdfs_stripe");
+ DFSTestUtil.writeFile(fs, srcPath, new String(expected));
+
+ verifyLength(fs, srcPath, fileLength);
+
+ byte[] smallBuf = new byte[1024];
+ byte[] largeBuf = new byte[fileLength + 100];
+ verifyPread(fs, srcPath, fileLength, expected, largeBuf);
+
+ verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf);
+ verifySeek(fs, srcPath, fileLength);
+ verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
+ //webhdfs doesn't support bytebuffer read
+
+ }
+
+ void verifyLength(FileSystem fs, Path srcPath, int fileLength)
+ throws IOException {
+ FileStatus status = fs.getFileStatus(srcPath);
Assert.assertEquals("File length should be the same",
- writeBytes, fileLength);
+ fileLength, status.getLen());
+ }
- // pread
- try (FSDataInputStream fsdis = fs.open(new Path(src))) {
- byte[] buf = new byte[writeBytes + 100];
- int readLen = fsdis.read(0, buf, 0, buf.length);
- readLen = readLen >= 0 ? readLen : 0;
- Assert.assertEquals("The length of file should be the same to write size",
- writeBytes, readLen);
- for (int i = 0; i < writeBytes; i++) {
- Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
- buf[i]);
+ void verifyPread(FileSystem fs, Path srcPath, int fileLength,
+ byte[] expected, byte[] buf) throws IOException {
+ FSDataInputStream in = fs.open(srcPath);
+ int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102,
+ cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102,
+ cellSize * dataBlocks, fileLength - 102, fileLength - 1};
+ for (int startOffset : startOffsets) {
+ startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
+ int remaining = fileLength - startOffset;
+ in.readFully(startOffset, buf, 0, remaining);
+ for (int i = 0; i < remaining; i++) {
+ Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " +
+ "same",
+ expected[startOffset + i], buf[i]);
}
}
+ in.close();
+ }
- // stateful read with byte array
- try (FSDataInputStream fsdis = fs.open(new Path(src))) {
- byte[] buf = new byte[writeBytes + 100];
- int readLen = readAll(fsdis, buf);
- Assert.assertEquals("The length of file should be the same to write size",
- writeBytes, readLen);
- for (int i = 0; i < writeBytes; i++) {
- Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
- buf[i]);
+ void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
+ byte[] expected, byte[] buf) throws IOException {
+ FSDataInputStream in = fs.open(srcPath);
+ final byte[] result = new byte[fileLength];
+ int readLen = 0;
+ int ret;
+ do {
+ ret = in.read(buf, 0, buf.length);
+ if (ret > 0) {
+ System.arraycopy(buf, 0, result, readLen, ret);
+ readLen += ret;
}
- }
+ } while (ret >= 0);
+ Assert.assertEquals("The length of file should be the same to write size",
+ fileLength, readLen);
+ Assert.assertArrayEquals(expected, result);
+ in.close();
+ }
- // seek and stateful read
- try (FSDataInputStream fsdis = fs.open(new Path(src))) {
- // seek to 1/2 of content
- int pos = writeBytes/2;
- assertSeekAndRead(fsdis, pos, writeBytes);
- // seek to 1/3 of content
- pos = writeBytes/3;
- assertSeekAndRead(fsdis, pos, writeBytes);
+ void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
+ byte[] expected, ByteBuffer buf) throws IOException {
+ FSDataInputStream in = fs.open(srcPath);
+ ByteBuffer result = ByteBuffer.allocate(fileLength);
+ int readLen = 0;
+ int ret;
+ do {
+ ret = in.read(buf);
+ if (ret > 0) {
+ readLen += ret;
+ buf.flip();
+ result.put(buf);
+ buf.clear();
+ }
+ } while (ret >= 0);
+ readLen = readLen >= 0 ? readLen : 0;
+ Assert.assertEquals("The length of file should be the same to write size",
+ fileLength, readLen);
+ Assert.assertArrayEquals(expected, result.array());
+ in.close();
+ }
- // seek to 0 pos
- pos = 0;
- assertSeekAndRead(fsdis, pos, writeBytes);
- if (writeBytes > cellSize) {
- // seek to cellSize boundary
- pos = cellSize -1;
- assertSeekAndRead(fsdis, pos, writeBytes);
- }
+ void verifySeek(FileSystem fs, Path srcPath, int fileLength)
+ throws IOException {
+ FSDataInputStream in = fs.open(srcPath);
+ // seek to 1/2 of content
+ int pos = fileLength / 2;
+ assertSeekAndRead(in, pos, fileLength);
+
+ // seek to 1/3 of content
+ pos = fileLength / 3;
+ assertSeekAndRead(in, pos, fileLength);
+
+ // seek to 0 pos
+ pos = 0;
+ assertSeekAndRead(in, pos, fileLength);
+
+ if (fileLength > cellSize) {
+ // seek to cellSize boundary
+ pos = cellSize - 1;
+ assertSeekAndRead(in, pos, fileLength);
+ }
- if (writeBytes > cellSize * dataBlocks) {
- // seek to striped cell group boundary
- pos = cellSize * dataBlocks - 1;
- assertSeekAndRead(fsdis, pos, writeBytes);
- }
+ if (fileLength > cellSize * dataBlocks) {
+ // seek to striped cell group boundary
+ pos = cellSize * dataBlocks - 1;
+ assertSeekAndRead(in, pos, fileLength);
+ }
- if (writeBytes > blockSize * dataBlocks) {
- // seek to striped block group boundary
- pos = blockSize * dataBlocks - 1;
- assertSeekAndRead(fsdis, pos, writeBytes);
- }
+ if (fileLength > blockSize * dataBlocks) {
+ // seek to striped block group boundary
+ pos = blockSize * dataBlocks - 1;
+ assertSeekAndRead(in, pos, fileLength);
+ }
+ if(!(in.getWrappedStream() instanceof ByteRangeInputStream)){
try {
- fsdis.seek(-1);
+ in.seek(-1);
Assert.fail("Should be failed if seek to negative offset");
} catch (EOFException e) {
// expected
}
try {
- fsdis.seek(writeBytes + 1);
+ in.seek(fileLength + 1);
Assert.fail("Should be failed if seek after EOF");
} catch (EOFException e) {
// expected
}
}
-
- // stateful read with ByteBuffer
- try (FSDataInputStream fsdis = fs.open(new Path(src))) {
- ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);
- int readLen = 0;
- int ret;
- do {
- ret = fsdis.read(buf);
- if (ret > 0) {
- readLen += ret;
- }
- } while (ret >= 0);
- readLen = readLen >= 0 ? readLen : 0;
- Assert.assertEquals("The length of file should be the same to write size",
- writeBytes, readLen);
- for (int i = 0; i < writeBytes; i++) {
- Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
- buf.array()[i]);
- }
- }
-
- // stateful read with 1KB size byte array
- try (FSDataInputStream fsdis = fs.open(new Path(src))) {
- final byte[] result = new byte[writeBytes];
- final byte[] buf = new byte[1024];
- int readLen = 0;
- int ret;
- do {
- ret = fsdis.read(buf, 0, buf.length);
- if (ret > 0) {
- System.arraycopy(buf, 0, result, readLen, ret);
- readLen += ret;
- }
- } while (ret >= 0);
- Assert.assertEquals("The length of file should be the same to write size",
- writeBytes, readLen);
- Assert.assertArrayEquals(bytes, result);
- }
-
- // stateful read using ByteBuffer with 1KB size
- try (FSDataInputStream fsdis = fs.open(new Path(src))) {
- final ByteBuffer result = ByteBuffer.allocate(writeBytes);
- final ByteBuffer buf = ByteBuffer.allocate(1024);
- int readLen = 0;
- int ret;
- do {
- ret = fsdis.read(buf);
- if (ret > 0) {
- readLen += ret;
- buf.flip();
- result.put(buf);
- buf.clear();
- }
- } while (ret >= 0);
- Assert.assertEquals("The length of file should be the same to write size",
- writeBytes, readLen);
- Assert.assertArrayEquals(bytes, result.array());
- }
+ in.close();
}
@Test