You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/18 19:16:57 UTC

[46/50] hadoop git commit: HDFS-8352. Erasure Coding: test webhdfs read write stripe file. (waltersu4549)

HDFS-8352. Erasure Coding: test webhdfs read write stripe file. (waltersu4549)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/84562b44
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/84562b44
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/84562b44

Branch: refs/heads/HDFS-7285
Commit: 84562b444dd2eebe1fd779a1b537b65b95c9e541
Parents: 1437ab8
Author: waltersu4549 <wa...@apache.org>
Authored: Mon May 18 19:10:37 2015 +0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon May 18 10:02:03 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/TestWriteReadStripedFile.java   | 267 ++++++++++---------
 1 file changed, 148 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/84562b44/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
index 57d6eb9..f78fb7a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
@@ -21,9 +21,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
+import org.apache.hadoop.hdfs.web.WebHdfsConstants;
+import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
 import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -33,23 +37,26 @@ import org.junit.Test;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Random;
 
 public class TestWriteReadStripedFile {
   private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
   private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
 
-
-  private static DistributedFileSystem fs;
   private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
   private final static int stripesPerBlock = 4;
   static int blockSize = cellSize * stripesPerBlock;
   static int numDNs = dataBlocks + parityBlocks + 2;
 
   private static MiniDFSCluster cluster;
+  private static Configuration conf;
+  private static FileSystem fs;
+
+  private static Random r= new Random();
 
   @BeforeClass
   public static void setup() throws IOException {
-    Configuration conf = new Configuration();
+    conf = new Configuration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
@@ -134,7 +141,7 @@ public class TestWriteReadStripedFile {
   @Test
   public void testFileMoreThanABlockGroup2() throws IOException {
     testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2",
-        blockSize * dataBlocks + cellSize+ 123);
+        blockSize * dataBlocks + cellSize + 123);
   }
 
 
@@ -171,7 +178,7 @@ public class TestWriteReadStripedFile {
   }
 
   private void assertSeekAndRead(FSDataInputStream fsdis, int pos,
-      int writeBytes) throws IOException {
+                                 int writeBytes) throws IOException {
     fsdis.seek(pos);
     byte[] buf = new byte[writeBytes];
     int readLen = readAll(fsdis, buf);
@@ -182,147 +189,169 @@ public class TestWriteReadStripedFile {
     }
   }
 
-  private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
+  private void testOneFileUsingDFSStripedInputStream(String src, int fileLength)
       throws IOException {
-    Path testPath = new Path(src);
-    final byte[] bytes = generateBytes(writeBytes);
-    DFSTestUtil.writeFile(fs, testPath, new String(bytes));
 
-    //check file length
-    FileStatus status = fs.getFileStatus(testPath);
-    long fileLength = status.getLen();
+    final byte[] expected = generateBytes(fileLength);
+    Path srcPath = new Path(src);
+    DFSTestUtil.writeFile(fs, srcPath, new String(expected));
+
+    verifyLength(fs, srcPath, fileLength);
+
+    byte[] smallBuf = new byte[1024];
+    byte[] largeBuf = new byte[fileLength + 100];
+    verifyPread(fs, srcPath, fileLength, expected, largeBuf);
+
+    verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf);
+    verifySeek(fs, srcPath, fileLength);
+    verifyStatefulRead(fs, srcPath, fileLength, expected,
+        ByteBuffer.allocate(fileLength + 100));
+    verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
+    verifyStatefulRead(fs, srcPath, fileLength, expected,
+        ByteBuffer.allocate(1024));
+  }
+
+  @Test
+  public void testWriteReadUsingWebHdfs() throws Exception {
+    int fileLength = blockSize * dataBlocks + cellSize + 123;
+
+    final byte[] expected = generateBytes(fileLength);
+    FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
+        WebHdfsConstants.WEBHDFS_SCHEME);
+    Path srcPath = new Path("/testWriteReadUsingWebHdfs_stripe");
+    DFSTestUtil.writeFile(fs, srcPath, new String(expected));
+
+    verifyLength(fs, srcPath, fileLength);
+
+    byte[] smallBuf = new byte[1024];
+    byte[] largeBuf = new byte[fileLength + 100];
+    verifyPread(fs, srcPath, fileLength, expected, largeBuf);
+
+    verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf);
+    verifySeek(fs, srcPath, fileLength);
+    verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
+    //webhdfs doesn't support bytebuffer read
+
+  }
+
+  void verifyLength(FileSystem fs, Path srcPath, int fileLength)
+      throws IOException {
+    FileStatus status = fs.getFileStatus(srcPath);
     Assert.assertEquals("File length should be the same",
-        writeBytes, fileLength);
+        fileLength, status.getLen());
+  }
 
-    // pread
-    try (FSDataInputStream fsdis = fs.open(new Path(src))) {
-      byte[] buf = new byte[writeBytes + 100];
-      int readLen = fsdis.read(0, buf, 0, buf.length);
-      readLen = readLen >= 0 ? readLen : 0;
-      Assert.assertEquals("The length of file should be the same to write size",
-          writeBytes, readLen);
-      for (int i = 0; i < writeBytes; i++) {
-        Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
-            buf[i]);
+  void verifyPread(FileSystem fs, Path srcPath,  int fileLength,
+                   byte[] expected, byte[] buf) throws IOException {
+    FSDataInputStream in = fs.open(srcPath);
+    int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102,
+        cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102,
+        cellSize * dataBlocks, fileLength - 102, fileLength - 1};
+    for (int startOffset : startOffsets) {
+      startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
+      int remaining = fileLength - startOffset;
+      in.readFully(startOffset, buf, 0, remaining);
+      for (int i = 0; i < remaining; i++) {
+        Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " +
+                "same",
+            expected[startOffset + i], buf[i]);
       }
     }
+    in.close();
+  }
 
-    // stateful read with byte array
-    try (FSDataInputStream fsdis = fs.open(new Path(src))) {
-      byte[] buf = new byte[writeBytes + 100];
-      int readLen = readAll(fsdis, buf);
-      Assert.assertEquals("The length of file should be the same to write size",
-          writeBytes, readLen);
-      for (int i = 0; i < writeBytes; i++) {
-        Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
-            buf[i]);
+  void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
+                          byte[] expected, byte[] buf) throws IOException {
+    FSDataInputStream in = fs.open(srcPath);
+    final byte[] result = new byte[fileLength];
+    int readLen = 0;
+    int ret;
+    do {
+      ret = in.read(buf, 0, buf.length);
+      if (ret > 0) {
+        System.arraycopy(buf, 0, result, readLen, ret);
+        readLen += ret;
       }
-    }
+    } while (ret >= 0);
+    Assert.assertEquals("The length of file should be the same to write size",
+        fileLength, readLen);
+    Assert.assertArrayEquals(expected, result);
+    in.close();
+  }
 
-    // seek and stateful read
-    try (FSDataInputStream fsdis = fs.open(new Path(src))) {
-      // seek to 1/2 of content
-      int pos = writeBytes/2;
-      assertSeekAndRead(fsdis, pos, writeBytes);
 
-      // seek to 1/3 of content
-      pos = writeBytes/3;
-      assertSeekAndRead(fsdis, pos, writeBytes);
+  void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
+                          byte[] expected, ByteBuffer buf) throws IOException {
+    FSDataInputStream in = fs.open(srcPath);
+    ByteBuffer result = ByteBuffer.allocate(fileLength);
+    int readLen = 0;
+    int ret;
+    do {
+      ret = in.read(buf);
+      if (ret > 0) {
+        readLen += ret;
+        buf.flip();
+        result.put(buf);
+        buf.clear();
+      }
+    } while (ret >= 0);
+    readLen = readLen >= 0 ? readLen : 0;
+    Assert.assertEquals("The length of file should be the same to write size",
+        fileLength, readLen);
+    Assert.assertArrayEquals(expected, result.array());
+    in.close();
+  }
 
-      // seek to 0 pos
-      pos = 0;
-      assertSeekAndRead(fsdis, pos, writeBytes);
 
-      if (writeBytes > cellSize) {
-        // seek to cellSize boundary
-        pos = cellSize -1;
-        assertSeekAndRead(fsdis, pos, writeBytes);
-      }
+  void verifySeek(FileSystem fs, Path srcPath, int fileLength)
+      throws IOException {
+    FSDataInputStream in = fs.open(srcPath);
+    // seek to 1/2 of content
+    int pos = fileLength / 2;
+    assertSeekAndRead(in, pos, fileLength);
+
+    // seek to 1/3 of content
+    pos = fileLength / 3;
+    assertSeekAndRead(in, pos, fileLength);
+
+    // seek to 0 pos
+    pos = 0;
+    assertSeekAndRead(in, pos, fileLength);
+
+    if (fileLength > cellSize) {
+      // seek to cellSize boundary
+      pos = cellSize - 1;
+      assertSeekAndRead(in, pos, fileLength);
+    }
 
-      if (writeBytes > cellSize * dataBlocks) {
-        // seek to striped cell group boundary
-        pos = cellSize * dataBlocks - 1;
-        assertSeekAndRead(fsdis, pos, writeBytes);
-      }
+    if (fileLength > cellSize * dataBlocks) {
+      // seek to striped cell group boundary
+      pos = cellSize * dataBlocks - 1;
+      assertSeekAndRead(in, pos, fileLength);
+    }
 
-      if (writeBytes > blockSize * dataBlocks) {
-        // seek to striped block group boundary
-        pos = blockSize * dataBlocks - 1;
-        assertSeekAndRead(fsdis, pos, writeBytes);
-      }
+    if (fileLength > blockSize * dataBlocks) {
+      // seek to striped block group boundary
+      pos = blockSize * dataBlocks - 1;
+      assertSeekAndRead(in, pos, fileLength);
+    }
 
+    if(!(in.getWrappedStream() instanceof ByteRangeInputStream)){
       try {
-        fsdis.seek(-1);
+        in.seek(-1);
         Assert.fail("Should be failed if seek to negative offset");
       } catch (EOFException e) {
         // expected
       }
 
       try {
-        fsdis.seek(writeBytes + 1);
+        in.seek(fileLength + 1);
         Assert.fail("Should be failed if seek after EOF");
       } catch (EOFException e) {
         // expected
       }
     }
-
-    // stateful read with ByteBuffer
-    try (FSDataInputStream fsdis = fs.open(new Path(src))) {
-      ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);
-      int readLen = 0;
-      int ret;
-      do {
-        ret = fsdis.read(buf);
-        if (ret > 0) {
-          readLen += ret;
-        }
-      } while (ret >= 0);
-      readLen = readLen >= 0 ? readLen : 0;
-      Assert.assertEquals("The length of file should be the same to write size",
-          writeBytes, readLen);
-      for (int i = 0; i < writeBytes; i++) {
-        Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
-            buf.array()[i]);
-      }
-    }
-
-    // stateful read with 1KB size byte array
-    try (FSDataInputStream fsdis = fs.open(new Path(src))) {
-      final byte[] result = new byte[writeBytes];
-      final byte[] buf = new byte[1024];
-      int readLen = 0;
-      int ret;
-      do {
-        ret = fsdis.read(buf, 0, buf.length);
-        if (ret > 0) {
-          System.arraycopy(buf, 0, result, readLen, ret);
-          readLen += ret;
-        }
-      } while (ret >= 0);
-      Assert.assertEquals("The length of file should be the same to write size",
-          writeBytes, readLen);
-      Assert.assertArrayEquals(bytes, result);
-    }
-
-    // stateful read using ByteBuffer with 1KB size
-    try (FSDataInputStream fsdis = fs.open(new Path(src))) {
-      final ByteBuffer result = ByteBuffer.allocate(writeBytes);
-      final ByteBuffer buf = ByteBuffer.allocate(1024);
-      int readLen = 0;
-      int ret;
-      do {
-        ret = fsdis.read(buf);
-        if (ret > 0) {
-          readLen += ret;
-          buf.flip();
-          result.put(buf);
-          buf.clear();
-        }
-      } while (ret >= 0);
-      Assert.assertEquals("The length of file should be the same to write size",
-          writeBytes, readLen);
-      Assert.assertArrayEquals(bytes, result.array());
-    }
+    in.close();
   }
 
   @Test