You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/07 02:10:12 UTC
hadoop git commit: HDFS-8334. Erasure coding: rename
DFSStripedInputStream related test classes. Contributed by Zhe Zhang.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7285 88e6c4229 -> ca32b8f12
HDFS-8334. Erasure coding: rename DFSStripedInputStream related test classes. Contributed by Zhe Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ca32b8f1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ca32b8f1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ca32b8f1
Branch: refs/heads/HDFS-7285
Commit: ca32b8f12cbed92d1fa2366a579aa8c86ea50f83
Parents: 88e6c42
Author: Zhe Zhang <zh...@apache.org>
Authored: Wed May 6 15:34:37 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Wed May 6 17:09:44 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 5 +
.../hadoop/hdfs/TestDFSStripedInputStream.java | 365 ++++++++-----------
.../apache/hadoop/hdfs/TestReadStripedFile.java | 218 -----------
.../hadoop/hdfs/TestWriteReadStripedFile.java | 261 +++++++++++++
4 files changed, 427 insertions(+), 422 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca32b8f1/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 0d2d448..8729f8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -178,3 +178,8 @@
HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks.
(Yi Liu via Zhe Zhang)
+
+ HADOOP-11921. Enhance tests for erasure coders. (Kai Zheng)
+
+ HDFS-8334. Erasure coding: rename DFSStripedInputStream related test
+ classes. (Zhe Zhang)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca32b8f1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index 11cdf7b..a1f704d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -17,245 +17,202 @@
*/
package org.apache.hadoop.hdfs;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
public class TestDFSStripedInputStream {
- private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
- private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
-
-
- private static DistributedFileSystem fs;
- private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
- private final static int stripesPerBlock = 4;
- static int blockSize = cellSize * stripesPerBlock;
- static int numDNs = dataBlocks + parityBlocks + 2;
-
- private static MiniDFSCluster cluster;
- @BeforeClass
- public static void setup() throws IOException {
- Configuration conf = new Configuration();
- conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
- cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
+ public static final Log LOG = LogFactory.getLog(TestDFSStripedInputStream.class);
+
+ private MiniDFSCluster cluster;
+ private Configuration conf = new Configuration();
+ private DistributedFileSystem fs;
+ private final Path dirPath = new Path("/striped");
+ private Path filePath = new Path(dirPath, "file");
+ private ECInfo info = new ECInfo(filePath.toString(),
+ ECSchemaManager.getSystemDefaultSchema());
+ private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
+ private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
+ private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ private final int NUM_STRIPE_PER_BLOCK = 2;
+ private final int INTERNAL_BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE;
+ private final int BLOCK_GROUP_SIZE = DATA_BLK_NUM * INTERNAL_BLOCK_SIZE;
+
+ @Before
+ public void setup() throws IOException {
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE);
+ SimulatedFSDataset.setFactory(conf);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+ DATA_BLK_NUM + PARITY_BLK_NUM).build();
+ cluster.waitActive();
fs = cluster.getFileSystem();
+ fs.mkdirs(dirPath);
+ fs.getClient().createErasureCodingZone(dirPath.toString(), null);
}
- @AfterClass
- public static void tearDown() {
+ @After
+ public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
+ /**
+ * Test {@link DFSStripedInputStream#getBlockAt(long)}
+ */
@Test
- public void testFileEmpty() throws IOException {
- testOneFileUsingDFSStripedInputStream("/EmptyFile", 0);
- }
-
- @Test
- public void testFileSmallerThanOneCell1() throws IOException {
- testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1);
- }
-
- @Test
- public void testFileSmallerThanOneCell2() throws IOException {
- testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1);
- }
-
- @Test
- public void testFileEqualsWithOneCell() throws IOException {
- testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize);
- }
-
- @Test
- public void testFileSmallerThanOneStripe1() throws IOException {
- testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
- cellSize * dataBlocks - 1);
- }
-
- @Test
- public void testFileSmallerThanOneStripe2() throws IOException {
- testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
- cellSize + 123);
- }
-
- @Test
- public void testFileEqualsWithOneStripe() throws IOException {
- testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe",
- cellSize * dataBlocks);
- }
-
- @Test
- public void testFileMoreThanOneStripe1() throws IOException {
- testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1",
- cellSize * dataBlocks + 123);
- }
-
- @Test
- public void testFileMoreThanOneStripe2() throws IOException {
- testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2",
- cellSize * dataBlocks + cellSize * dataBlocks + 123);
- }
-
- @Test
- public void testLessThanFullBlockGroup() throws IOException {
- testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup",
- cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize);
- }
-
- @Test
- public void testFileFullBlockGroup() throws IOException {
- testOneFileUsingDFSStripedInputStream("/FullBlockGroup",
- blockSize * dataBlocks);
+ public void testGetBlock() throws Exception {
+ final int numBlocks = 4;
+ DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+ NUM_STRIPE_PER_BLOCK, false);
+ LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+ filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
+ final DFSStripedInputStream in =
+ new DFSStripedInputStream(fs.getClient(), filePath.toString(), false, info);
+
+ List<LocatedBlock> lbList = lbs.getLocatedBlocks();
+ for (LocatedBlock aLbList : lbList) {
+ LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList;
+ LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb,
+ CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM);
+ for (int j = 0; j < DATA_BLK_NUM; j++) {
+ LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset());
+ assertEquals(blks[j].getBlock(), refreshed.getBlock());
+ assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset());
+ assertArrayEquals(blks[j].getLocations(), refreshed.getLocations());
+ }
+ }
}
@Test
- public void testFileMoreThanABlockGroup1() throws IOException {
- testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1",
- blockSize * dataBlocks + 123);
- }
+ public void testPread() throws Exception {
+ final int numBlocks = 2;
+ DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+ NUM_STRIPE_PER_BLOCK, false);
+ LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+ filePath.toString(), 0, BLOCK_GROUP_SIZE);
+
+ assert lbs.get(0) instanceof LocatedStripedBlock;
+ LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
+ for (int i = 0; i < DATA_BLK_NUM; i++) {
+ Block blk = new Block(bg.getBlock().getBlockId() + i,
+ NUM_STRIPE_PER_BLOCK * CELLSIZE,
+ bg.getBlock().getGenerationStamp());
+ blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+ cluster.injectBlocks(i, Arrays.asList(blk),
+ bg.getBlock().getBlockPoolId());
+ }
+ DFSStripedInputStream in =
+ new DFSStripedInputStream(fs.getClient(),
+ filePath.toString(), false, info);
+ int readSize = BLOCK_GROUP_SIZE;
+ byte[] readBuffer = new byte[readSize];
+ int ret = in.read(0, readBuffer, 0, readSize);
- @Test
- public void testFileMoreThanABlockGroup2() throws IOException {
- testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2",
- blockSize * dataBlocks + cellSize+ 123);
+ assertEquals(readSize, ret);
+ // TODO: verify read results with patterned data from HDFS-8117
}
-
@Test
- public void testFileMoreThanABlockGroup3() throws IOException {
- testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3",
- blockSize * dataBlocks * 3 + cellSize * dataBlocks
- + cellSize + 123);
- }
-
- private byte[] generateBytes(int cnt) {
- byte[] bytes = new byte[cnt];
- for (int i = 0; i < cnt; i++) {
- bytes[i] = getByte(i);
+ public void testStatefulRead() throws Exception {
+ testStatefulRead(false, false);
+ testStatefulRead(true, false);
+ testStatefulRead(true, true);
+ }
+
+ private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
+ throws Exception {
+ final int numBlocks = 2;
+ final int fileSize = numBlocks * BLOCK_GROUP_SIZE;
+ if (cellMisalignPacket) {
+ conf.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT + 1);
+ tearDown();
+ setup();
}
- return bytes;
- }
-
- private byte getByte(long pos) {
- final int mod = 29;
- return (byte) (pos % mod + 1);
- }
-
- private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
- throws IOException {
- Path testPath = new Path(src);
- final byte[] bytes = generateBytes(writeBytes);
- DFSTestUtil.writeFile(fs, testPath, new String(bytes));
-
- //check file length
- FileStatus status = fs.getFileStatus(testPath);
- long fileLength = status.getLen();
- Assert.assertEquals("File length should be the same",
- writeBytes, fileLength);
-
- // pread
- try (FSDataInputStream fsdis = fs.open(new Path(src))) {
- byte[] buf = new byte[writeBytes + 100];
- int readLen = fsdis.read(0, buf, 0, buf.length);
- readLen = readLen >= 0 ? readLen : 0;
- Assert.assertEquals("The length of file should be the same to write size",
- writeBytes, readLen);
- for (int i = 0; i < writeBytes; i++) {
- Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
- buf[i]);
+ DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+ NUM_STRIPE_PER_BLOCK, false);
+ LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+ filePath.toString(), 0, fileSize);
+
+ assert lbs.getLocatedBlocks().size() == numBlocks;
+ for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+ assert lb instanceof LocatedStripedBlock;
+ LocatedStripedBlock bg = (LocatedStripedBlock)(lb);
+ for (int i = 0; i < DATA_BLK_NUM; i++) {
+ Block blk = new Block(bg.getBlock().getBlockId() + i,
+ NUM_STRIPE_PER_BLOCK * CELLSIZE,
+ bg.getBlock().getGenerationStamp());
+ blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+ cluster.injectBlocks(i, Arrays.asList(blk),
+ bg.getBlock().getBlockPoolId());
}
}
- // stateful read with byte array
- try (FSDataInputStream fsdis = fs.open(new Path(src))) {
- byte[] buf = new byte[writeBytes + 100];
- int readLen = 0;
- int ret;
- do {
- ret = fsdis.read(buf, readLen, buf.length - readLen);
- if (ret > 0) {
- readLen += ret;
+ DFSStripedInputStream in =
+ new DFSStripedInputStream(fs.getClient(), filePath.toString(),
+ false, info);
+
+ byte[] expected = new byte[fileSize];
+
+ for (LocatedBlock bg : lbs.getLocatedBlocks()) {
+ /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
+ for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+ for (int j = 0; j < DATA_BLK_NUM; j++) {
+ for (int k = 0; k < CELLSIZE; k++) {
+ int posInBlk = i * CELLSIZE + k;
+ int posInFile = (int) bg.getStartOffset() +
+ i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
+ expected[posInFile] = SimulatedFSDataset.simulatedByte(
+ new Block(bg.getBlock().getBlockId() + j), posInBlk);
+ }
}
- } while (ret >= 0);
- readLen = readLen >= 0 ? readLen : 0;
- Assert.assertEquals("The length of file should be the same to write size",
- writeBytes, readLen);
- for (int i = 0; i < writeBytes; i++) {
- Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
- buf[i]);
}
}
- // stateful read with ByteBuffer
- try (FSDataInputStream fsdis = fs.open(new Path(src))) {
- ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);
- int readLen = 0;
- int ret;
- do {
- ret = fsdis.read(buf);
- if (ret > 0) {
- readLen += ret;
- }
- } while (ret >= 0);
- readLen = readLen >= 0 ? readLen : 0;
- Assert.assertEquals("The length of file should be the same to write size",
- writeBytes, readLen);
- for (int i = 0; i < writeBytes; i++) {
- Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
- buf.array()[i]);
+ if (useByteBuffer) {
+ ByteBuffer readBuffer = ByteBuffer.allocate(fileSize);
+ int done = 0;
+ while (done < fileSize) {
+ int ret = in.read(readBuffer);
+ assertTrue(ret > 0);
+ done += ret;
}
+ assertArrayEquals(expected, readBuffer.array());
+ } else {
+ byte[] readBuffer = new byte[fileSize];
+ int done = 0;
+ while (done < fileSize) {
+ int ret = in.read(readBuffer, done, fileSize - done);
+ assertTrue(ret > 0);
+ done += ret;
+ }
+ assertArrayEquals(expected, readBuffer);
}
-
- // stateful read with 1KB size byte array
- try (FSDataInputStream fsdis = fs.open(new Path(src))) {
- final byte[] result = new byte[writeBytes];
- final byte[] buf = new byte[1024];
- int readLen = 0;
- int ret;
- do {
- ret = fsdis.read(buf, 0, buf.length);
- if (ret > 0) {
- System.arraycopy(buf, 0, result, readLen, ret);
- readLen += ret;
- }
- } while (ret >= 0);
- Assert.assertEquals("The length of file should be the same to write size",
- writeBytes, readLen);
- Assert.assertArrayEquals(bytes, result);
- }
-
- // stateful read using ByteBuffer with 1KB size
- try (FSDataInputStream fsdis = fs.open(new Path(src))) {
- final ByteBuffer result = ByteBuffer.allocate(writeBytes);
- final ByteBuffer buf = ByteBuffer.allocate(1024);
- int readLen = 0;
- int ret;
- do {
- ret = fsdis.read(buf);
- if (ret > 0) {
- readLen += ret;
- buf.flip();
- result.put(buf);
- buf.clear();
- }
- } while (ret >= 0);
- Assert.assertEquals("The length of file should be the same to write size",
- writeBytes, readLen);
- Assert.assertArrayEquals(bytes, result.array());
- }
+ fs.delete(filePath, true);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca32b8f1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
deleted file mode 100644
index 1ad480e..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ECInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
-import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-public class TestReadStripedFile {
-
- public static final Log LOG = LogFactory.getLog(TestReadStripedFile.class);
-
- private MiniDFSCluster cluster;
- private Configuration conf = new Configuration();
- private DistributedFileSystem fs;
- private final Path dirPath = new Path("/striped");
- private Path filePath = new Path(dirPath, "file");
- private ECInfo info = new ECInfo(filePath.toString(),
- ECSchemaManager.getSystemDefaultSchema());
- private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
- private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
- private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
- private final int NUM_STRIPE_PER_BLOCK = 2;
- private final int INTERNAL_BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE;
- private final int BLOCK_GROUP_SIZE = DATA_BLK_NUM * INTERNAL_BLOCK_SIZE;
-
- @Before
- public void setup() throws IOException {
- conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE);
- SimulatedFSDataset.setFactory(conf);
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
- DATA_BLK_NUM + PARITY_BLK_NUM).build();
- cluster.waitActive();
- fs = cluster.getFileSystem();
- fs.mkdirs(dirPath);
- fs.getClient().createErasureCodingZone(dirPath.toString(), null);
- }
-
- @After
- public void tearDown() {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
- /**
- * Test {@link DFSStripedInputStream#getBlockAt(long)}
- */
- @Test
- public void testGetBlock() throws Exception {
- final int numBlocks = 4;
- DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
- NUM_STRIPE_PER_BLOCK, false);
- LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
- filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
- final DFSStripedInputStream in =
- new DFSStripedInputStream(fs.getClient(), filePath.toString(), false, info);
-
- List<LocatedBlock> lbList = lbs.getLocatedBlocks();
- for (LocatedBlock aLbList : lbList) {
- LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList;
- LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb,
- CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM);
- for (int j = 0; j < DATA_BLK_NUM; j++) {
- LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset());
- assertEquals(blks[j].getBlock(), refreshed.getBlock());
- assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset());
- assertArrayEquals(blks[j].getLocations(), refreshed.getLocations());
- }
- }
- }
-
- @Test
- public void testPread() throws Exception {
- final int numBlocks = 2;
- DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
- NUM_STRIPE_PER_BLOCK, false);
- LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
- filePath.toString(), 0, BLOCK_GROUP_SIZE);
-
- assert lbs.get(0) instanceof LocatedStripedBlock;
- LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
- for (int i = 0; i < DATA_BLK_NUM; i++) {
- Block blk = new Block(bg.getBlock().getBlockId() + i,
- NUM_STRIPE_PER_BLOCK * CELLSIZE,
- bg.getBlock().getGenerationStamp());
- blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
- cluster.injectBlocks(i, Arrays.asList(blk),
- bg.getBlock().getBlockPoolId());
- }
- DFSStripedInputStream in =
- new DFSStripedInputStream(fs.getClient(),
- filePath.toString(), false, info);
- int readSize = BLOCK_GROUP_SIZE;
- byte[] readBuffer = new byte[readSize];
- int ret = in.read(0, readBuffer, 0, readSize);
-
- assertEquals(readSize, ret);
- // TODO: verify read results with patterned data from HDFS-8117
- }
-
- @Test
- public void testStatefulRead() throws Exception {
- testStatefulRead(false, false);
- testStatefulRead(true, false);
- testStatefulRead(true, true);
- }
-
- private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
- throws Exception {
- final int numBlocks = 2;
- final int fileSize = numBlocks * BLOCK_GROUP_SIZE;
- if (cellMisalignPacket) {
- conf.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT + 1);
- tearDown();
- setup();
- }
- DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
- NUM_STRIPE_PER_BLOCK, false);
- LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
- filePath.toString(), 0, fileSize);
-
- assert lbs.getLocatedBlocks().size() == numBlocks;
- for (LocatedBlock lb : lbs.getLocatedBlocks()) {
- assert lb instanceof LocatedStripedBlock;
- LocatedStripedBlock bg = (LocatedStripedBlock)(lb);
- for (int i = 0; i < DATA_BLK_NUM; i++) {
- Block blk = new Block(bg.getBlock().getBlockId() + i,
- NUM_STRIPE_PER_BLOCK * CELLSIZE,
- bg.getBlock().getGenerationStamp());
- blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
- cluster.injectBlocks(i, Arrays.asList(blk),
- bg.getBlock().getBlockPoolId());
- }
- }
-
- DFSStripedInputStream in =
- new DFSStripedInputStream(fs.getClient(), filePath.toString(),
- false, info);
-
- byte[] expected = new byte[fileSize];
-
- for (LocatedBlock bg : lbs.getLocatedBlocks()) {
- /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
- for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
- for (int j = 0; j < DATA_BLK_NUM; j++) {
- for (int k = 0; k < CELLSIZE; k++) {
- int posInBlk = i * CELLSIZE + k;
- int posInFile = (int) bg.getStartOffset() +
- i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
- expected[posInFile] = SimulatedFSDataset.simulatedByte(
- new Block(bg.getBlock().getBlockId() + j), posInBlk);
- }
- }
- }
- }
-
- if (useByteBuffer) {
- ByteBuffer readBuffer = ByteBuffer.allocate(fileSize);
- int done = 0;
- while (done < fileSize) {
- int ret = in.read(readBuffer);
- assertTrue(ret > 0);
- done += ret;
- }
- assertArrayEquals(expected, readBuffer.array());
- } else {
- byte[] readBuffer = new byte[fileSize];
- int done = 0;
- while (done < fileSize) {
- int ret = in.read(readBuffer, done, fileSize - done);
- assertTrue(ret > 0);
- done += ret;
- }
- assertArrayEquals(expected, readBuffer);
- }
- fs.delete(filePath, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca32b8f1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
new file mode 100644
index 0000000..eacc6ed
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class TestWriteReadStripedFile {
+ private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+ private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+
+
+ private static DistributedFileSystem fs;
+ private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ private final static int stripesPerBlock = 4;
+ static int blockSize = cellSize * stripesPerBlock;
+ static int numDNs = dataBlocks + parityBlocks + 2;
+
+ private static MiniDFSCluster cluster;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+ cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
+ fs = cluster.getFileSystem();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testFileEmpty() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/EmptyFile", 0);
+ }
+
+ @Test
+ public void testFileSmallerThanOneCell1() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1);
+ }
+
+ @Test
+ public void testFileSmallerThanOneCell2() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1);
+ }
+
+ @Test
+ public void testFileEqualsWithOneCell() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize);
+ }
+
+ @Test
+ public void testFileSmallerThanOneStripe1() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
+ cellSize * dataBlocks - 1);
+ }
+
+ @Test
+ public void testFileSmallerThanOneStripe2() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
+ cellSize + 123);
+ }
+
+ @Test
+ public void testFileEqualsWithOneStripe() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe",
+ cellSize * dataBlocks);
+ }
+
+ @Test
+ public void testFileMoreThanOneStripe1() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1",
+ cellSize * dataBlocks + 123);
+ }
+
+ @Test
+ public void testFileMoreThanOneStripe2() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2",
+ cellSize * dataBlocks + cellSize * dataBlocks + 123);
+ }
+
+ @Test
+ public void testLessThanFullBlockGroup() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup",
+ cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize);
+ }
+
+ @Test
+ public void testFileFullBlockGroup() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/FullBlockGroup",
+ blockSize * dataBlocks);
+ }
+
+ @Test
+ public void testFileMoreThanABlockGroup1() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1",
+ blockSize * dataBlocks + 123);
+ }
+
+ @Test
+ public void testFileMoreThanABlockGroup2() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2",
+ blockSize * dataBlocks + cellSize+ 123);
+ }
+
+
+ @Test
+ public void testFileMoreThanABlockGroup3() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3",
+ blockSize * dataBlocks * 3 + cellSize * dataBlocks
+ + cellSize + 123);
+ }
+
+ private byte[] generateBytes(int cnt) {
+ byte[] bytes = new byte[cnt];
+ for (int i = 0; i < cnt; i++) {
+ bytes[i] = getByte(i);
+ }
+ return bytes;
+ }
+
+ private byte getByte(long pos) {
+ final int mod = 29;
+ return (byte) (pos % mod + 1);
+ }
+
+ private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
+ throws IOException {
+ Path testPath = new Path(src);
+ final byte[] bytes = generateBytes(writeBytes);
+ DFSTestUtil.writeFile(fs, testPath, new String(bytes));
+
+ //check file length
+ FileStatus status = fs.getFileStatus(testPath);
+ long fileLength = status.getLen();
+ Assert.assertEquals("File length should be the same",
+ writeBytes, fileLength);
+
+ // pread
+ try (FSDataInputStream fsdis = fs.open(new Path(src))) {
+ byte[] buf = new byte[writeBytes + 100];
+ int readLen = fsdis.read(0, buf, 0, buf.length);
+ readLen = readLen >= 0 ? readLen : 0;
+ Assert.assertEquals("The length of file should be the same to write size",
+ writeBytes, readLen);
+ for (int i = 0; i < writeBytes; i++) {
+ Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
+ buf[i]);
+ }
+ }
+
+ // stateful read with byte array
+ try (FSDataInputStream fsdis = fs.open(new Path(src))) {
+ byte[] buf = new byte[writeBytes + 100];
+ int readLen = 0;
+ int ret;
+ do {
+ ret = fsdis.read(buf, readLen, buf.length - readLen);
+ if (ret > 0) {
+ readLen += ret;
+ }
+ } while (ret >= 0);
+ readLen = readLen >= 0 ? readLen : 0;
+ Assert.assertEquals("The length of file should be the same to write size",
+ writeBytes, readLen);
+ for (int i = 0; i < writeBytes; i++) {
+ Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
+ buf[i]);
+ }
+ }
+
+ // stateful read with ByteBuffer
+ try (FSDataInputStream fsdis = fs.open(new Path(src))) {
+ ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);
+ int readLen = 0;
+ int ret;
+ do {
+ ret = fsdis.read(buf);
+ if (ret > 0) {
+ readLen += ret;
+ }
+ } while (ret >= 0);
+ readLen = readLen >= 0 ? readLen : 0;
+ Assert.assertEquals("The length of file should be the same to write size",
+ writeBytes, readLen);
+ for (int i = 0; i < writeBytes; i++) {
+ Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
+ buf.array()[i]);
+ }
+ }
+
+ // stateful read with 1KB size byte array
+ try (FSDataInputStream fsdis = fs.open(new Path(src))) {
+ final byte[] result = new byte[writeBytes];
+ final byte[] buf = new byte[1024];
+ int readLen = 0;
+ int ret;
+ do {
+ ret = fsdis.read(buf, 0, buf.length);
+ if (ret > 0) {
+ System.arraycopy(buf, 0, result, readLen, ret);
+ readLen += ret;
+ }
+ } while (ret >= 0);
+ Assert.assertEquals("The length of file should be the same to write size",
+ writeBytes, readLen);
+ Assert.assertArrayEquals(bytes, result);
+ }
+
+ // stateful read using ByteBuffer with 1KB size
+ try (FSDataInputStream fsdis = fs.open(new Path(src))) {
+ final ByteBuffer result = ByteBuffer.allocate(writeBytes);
+ final ByteBuffer buf = ByteBuffer.allocate(1024);
+ int readLen = 0;
+ int ret;
+ do {
+ ret = fsdis.read(buf);
+ if (ret > 0) {
+ readLen += ret;
+ buf.flip();
+ result.put(buf);
+ buf.clear();
+ }
+ } while (ret >= 0);
+ Assert.assertEquals("The length of file should be the same to write size",
+ writeBytes, readLen);
+ Assert.assertArrayEquals(bytes, result.array());
+ }
+ }
+}