You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/30 20:23:02 UTC
[28/52] [abbrv] hadoop git commit: HDFS-9040. Erasure coding:
coordinate data streamers in DFSStripedOutputStream. Contributed by Jing Zhao
and Walter Su.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
index 8d4a0cf..12453fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -20,23 +20,35 @@ package org.apache.hadoop.hdfs;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.junit.Assert;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Random;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.junit.Assert.assertEquals;
+
public class StripedFileTestUtil {
public static final Log LOG = LogFactory.getLog(StripedFileTestUtil.class);
/*
@@ -50,8 +62,8 @@ public class StripedFileTestUtil {
static final int stripesPerBlock = 4;
static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock;
static final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2;
+ static final int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS;
- static final Random random = new Random();
static byte[] generateBytes(int cnt) {
byte[] bytes = new byte[cnt];
@@ -61,6 +73,11 @@ public class StripedFileTestUtil {
return bytes;
}
+ static byte getByte(long pos) {
+ final int mod = 29;
+ return (byte) (pos % mod + 1);
+ }
+
static int readAll(FSDataInputStream in, byte[] buf) throws IOException {
int readLen = 0;
int ret;
@@ -71,15 +88,10 @@ public class StripedFileTestUtil {
return readLen;
}
- static byte getByte(long pos) {
- final int mod = 29;
- return (byte) (pos % mod + 1);
- }
-
static void verifyLength(FileSystem fs, Path srcPath, int fileLength)
throws IOException {
FileStatus status = fs.getFileStatus(srcPath);
- Assert.assertEquals("File length should be the same", fileLength, status.getLen());
+ assertEquals("File length should be the same", fileLength, status.getLen());
}
static void verifyPread(FileSystem fs, Path srcPath, int fileLength,
@@ -101,9 +113,7 @@ public class StripedFileTestUtil {
offset += target;
}
for (int i = 0; i < fileLength - startOffset; i++) {
- Assert.assertEquals("Byte at " + (startOffset + i) + " is different, "
- + "the startOffset is " + startOffset,
- expected[startOffset + i], result[i]);
+ assertEquals("Byte at " + (startOffset + i) + " is different, " + "the startOffset is " + startOffset, expected[startOffset + i], result[i]);
}
}
}
@@ -119,8 +129,7 @@ public class StripedFileTestUtil {
System.arraycopy(buf, 0, result, readLen, ret);
readLen += ret;
}
- Assert.assertEquals("The length of file should be the same to write size",
- fileLength, readLen);
+ assertEquals("The length of file should be the same to write size", fileLength, readLen);
Assert.assertArrayEquals(expected, result);
}
}
@@ -137,8 +146,7 @@ public class StripedFileTestUtil {
result.put(buf);
buf.clear();
}
- Assert.assertEquals("The length of file should be the same to write size",
- fileLength, readLen);
+ assertEquals("The length of file should be the same to write size", fileLength, readLen);
Assert.assertArrayEquals(expected, result.array());
}
}
@@ -199,10 +207,9 @@ public class StripedFileTestUtil {
fsdis.seek(pos);
byte[] buf = new byte[writeBytes];
int readLen = StripedFileTestUtil.readAll(fsdis, buf);
- Assert.assertEquals(readLen, writeBytes - pos);
+ assertEquals(readLen, writeBytes - pos);
for (int i = 0; i < readLen; i++) {
- Assert.assertEquals("Byte at " + i + " should be the same",
- StripedFileTestUtil.getByte(pos + i), buf[i]);
+ assertEquals("Byte at " + i + " should be the same", StripedFileTestUtil.getByte(pos + i), buf[i]);
}
}
@@ -210,6 +217,7 @@ public class StripedFileTestUtil {
final int dnIndex, final AtomicInteger pos) {
final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
final DatanodeInfo datanode = getDatanodes(s);
+ assert datanode != null;
LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
cluster.stopDataNode(datanode.getXferAddr());
}
@@ -218,7 +226,7 @@ public class StripedFileTestUtil {
for(;;) {
final DatanodeInfo[] datanodes = streamer.getNodes();
if (datanodes != null) {
- Assert.assertEquals(1, datanodes.length);
+ assertEquals(1, datanodes.length);
Assert.assertNotNull(datanodes[0]);
return datanodes[0];
}
@@ -287,7 +295,6 @@ public class StripedFileTestUtil {
* @param min minimum of the range
* @param max maximum of the range
* @param n number to be generated
- * @return
*/
public static int[] randomArray(int min, int max, int n){
if (n > (max - min + 1) || max < min || min < 0 || max < 0) {
@@ -315,4 +322,170 @@ public class StripedFileTestUtil {
}
return result;
}
+
+ /**
+ * Verify that blocks in striped block group are on different nodes, and every
+ * internal blocks exists.
+ */
+ public static void verifyLocatedStripedBlocks(LocatedBlocks lbs, int groupSize) {
+ for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+ assert lb instanceof LocatedStripedBlock;
+ HashSet<DatanodeInfo> locs = new HashSet<>();
+ Collections.addAll(locs, lb.getLocations());
+ assertEquals(groupSize, lb.getLocations().length);
+ assertEquals(groupSize, locs.size());
+
+ // verify that every internal blocks exists
+ int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices();
+ assertEquals(groupSize, blockIndices.length);
+ HashSet<Integer> found = new HashSet<>();
+ for (int index : blockIndices) {
+ assert index >=0;
+ found.add(index);
+ }
+ assertEquals(groupSize, found.size());
+ }
+ }
+
+ static void checkData(DistributedFileSystem dfs, Path srcPath, int length,
+ int[] killedDnIndex, long oldGS) throws IOException {
+
+ StripedFileTestUtil.verifyLength(dfs, srcPath, length);
+ Arrays.sort(killedDnIndex);
+ List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
+ LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(srcPath.toString(), 0L,
+ Long.MAX_VALUE);
+ int expectedNumGroup = 0;
+ if (length > 0) {
+ expectedNumGroup = (length - 1) / BLOCK_GROUP_SIZE + 1;
+ }
+ assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size());
+
+ for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
+ Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
+
+ final long gs = firstBlock.getBlock().getGenerationStamp();
+ final String s = "gs=" + gs + ", oldGS=" + oldGS;
+ LOG.info(s);
+ Assert.assertTrue(s, gs >= oldGS);
+
+ LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
+ (LocatedStripedBlock) firstBlock, BLOCK_STRIPED_CELL_SIZE,
+ NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
+ blockGroupList.add(Arrays.asList(blocks));
+ }
+
+ // test each block group
+ for (int group = 0; group < blockGroupList.size(); group++) {
+ final boolean isLastGroup = group == blockGroupList.size() - 1;
+ final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE
+ : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE;
+ final int numCellInGroup = (groupSize - 1)/BLOCK_STRIPED_CELL_SIZE + 1;
+ final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS;
+ final int lastCellSize = groupSize - (numCellInGroup - 1)*BLOCK_STRIPED_CELL_SIZE;
+
+ //get the data of this block
+ List<LocatedBlock> blockList = blockGroupList.get(group);
+ byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][];
+ byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][];
+
+ // for each block, use BlockReader to read data
+ for (int i = 0; i < blockList.size(); i++) {
+ final int j = i >= NUM_DATA_BLOCKS? 0: i;
+ final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS
+ + (j <= lastCellIndex? 1: 0);
+ final int blockSize = numCellInBlock*BLOCK_STRIPED_CELL_SIZE
+ + (isLastGroup && j == lastCellIndex? lastCellSize - BLOCK_STRIPED_CELL_SIZE: 0);
+
+ final byte[] blockBytes = new byte[blockSize];
+ if (i < NUM_DATA_BLOCKS) {
+ dataBlockBytes[i] = blockBytes;
+ } else {
+ parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes;
+ }
+
+ final LocatedBlock lb = blockList.get(i);
+ LOG.info("i,j=" + i + ", " + j + ", numCellInBlock=" + numCellInBlock
+ + ", blockSize=" + blockSize + ", lb=" + lb);
+ if (lb == null) {
+ continue;
+ }
+ final ExtendedBlock block = lb.getBlock();
+ assertEquals(blockSize, block.getNumBytes());
+
+ if (block.getNumBytes() == 0) {
+ continue;
+ }
+
+ if (Arrays.binarySearch(killedDnIndex, i) < 0) {
+ final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
+ dfs, lb, 0, block.getNumBytes());
+ blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
+ blockReader.close();
+ }
+ }
+
+ // check data
+ final int groupPosInFile = group*BLOCK_GROUP_SIZE;
+ for (int i = 0; i < dataBlockBytes.length; i++) {
+ boolean killed = false;
+ if (Arrays.binarySearch(killedDnIndex, i) >= 0){
+ killed = true;
+ }
+ final byte[] actual = dataBlockBytes[i];
+ for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) {
+ final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(
+ BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile;
+ Assert.assertTrue(posInFile < length);
+ final byte expected = getByte(posInFile);
+
+ if (killed) {
+ actual[posInBlk] = expected;
+ } else {
+ if(expected != actual[posInBlk]){
+ String s = "expected=" + expected + " but actual=" + actual[posInBlk]
+ + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk
+ + ". group=" + group + ", i=" + i;
+ Assert.fail(s);
+ }
+ }
+ }
+ }
+
+ // check parity
+ verifyParityBlocks(dfs.getConf(), lbs.getLocatedBlocks().get(group)
+ .getBlockSize(),
+ BLOCK_STRIPED_CELL_SIZE, dataBlockBytes, parityBlockBytes, killedDnIndex);
+ }
+ }
+
+ static void verifyParityBlocks(Configuration conf, final long size, final int cellSize,
+ byte[][] dataBytes, byte[][] parityBytes, int[] killedDnIndex) {
+ Arrays.sort(killedDnIndex);
+ // verify the parity blocks
+ int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
+ size, cellSize, dataBytes.length, dataBytes.length);
+ final byte[][] expectedParityBytes = new byte[parityBytes.length][];
+ for (int i = 0; i < parityBytes.length; i++) {
+ expectedParityBytes[i] = new byte[parityBlkSize];
+ }
+ for (int i = 0; i < dataBytes.length; i++) {
+ if (dataBytes[i] == null) {
+ dataBytes[i] = new byte[dataBytes[0].length];
+ } else if (dataBytes[i].length < dataBytes[0].length) {
+ final byte[] tmp = dataBytes[i];
+ dataBytes[i] = new byte[dataBytes[0].length];
+ System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length);
+ }
+ }
+ final RawErasureEncoder encoder =
+ CodecUtil.createRSRawEncoder(conf, dataBytes.length, parityBytes.length);
+ encoder.encode(dataBytes, expectedParityBytes);
+ for (int i = 0; i < parityBytes.length; i++) {
+ if (Arrays.binarySearch(killedDnIndex, dataBytes.length + i) < 0){
+ Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + Arrays.toString(killedDnIndex),
+ expectedParityBytes[i], parityBytes[i]);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index 0641e8e..d78e88b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -18,26 +18,14 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
-import org.apache.hadoop.io.erasurecode.CodecUtil;
-import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -154,141 +142,15 @@ public class TestDFSStripedOutputStream {
+ cellSize + 123);
}
- private byte[] generateBytes(int cnt) {
- byte[] bytes = new byte[cnt];
- for (int i = 0; i < cnt; i++) {
- bytes[i] = getByte(i);
- }
- return bytes;
- }
-
- private byte getByte(long pos) {
- int mod = 29;
- return (byte) (pos % mod + 1);
- }
-
private void testOneFile(String src, int writeBytes) throws Exception {
src += "_" + writeBytes;
Path testPath = new Path(src);
- byte[] bytes = generateBytes(writeBytes);
+ byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, testPath, new String(bytes));
StripedFileTestUtil.waitBlockGroupsReported(fs, src);
- // check file length
- FileStatus status = fs.getFileStatus(testPath);
- Assert.assertEquals(writeBytes, status.getLen());
-
- checkData(src, writeBytes);
- }
-
- void checkData(String src, int writeBytes) throws IOException {
- List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
- LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
-
- for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
- Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
- LocatedBlock[] blocks = StripedBlockUtil.
- parseStripedBlockGroup((LocatedStripedBlock) firstBlock,
- cellSize, dataBlocks, parityBlocks);
- List<LocatedBlock> oneGroup = Arrays.asList(blocks);
- blockGroupList.add(oneGroup);
- }
-
- // test each block group
- for (int group = 0; group < blockGroupList.size(); group++) {
- //get the data of this block
- List<LocatedBlock> blockList = blockGroupList.get(group);
- byte[][] dataBlockBytes = new byte[dataBlocks][];
- byte[][] parityBlockBytes = new byte[parityBlocks][];
-
- // for each block, use BlockReader to read data
- for (int i = 0; i < blockList.size(); i++) {
- LocatedBlock lblock = blockList.get(i);
- if (lblock == null) {
- continue;
- }
- ExtendedBlock block = lblock.getBlock();
- byte[] blockBytes = new byte[(int)block.getNumBytes()];
- if (i < dataBlocks) {
- dataBlockBytes[i] = blockBytes;
- } else {
- parityBlockBytes[i - dataBlocks] = blockBytes;
- }
-
- if (block.getNumBytes() == 0) {
- continue;
- }
-
- final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
- fs, lblock, 0, block.getNumBytes());
- blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
- blockReader.close();
- }
-
- // check if we write the data correctly
- for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length;
- blkIdxInGroup++) {
- final byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup];
- if (actualBlkBytes == null) {
- continue;
- }
- for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) {
- // calculate the position of this byte in the file
- long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize,
- dataBlocks, posInBlk, blkIdxInGroup) +
- group * blockSize * dataBlocks;
- Assert.assertTrue(posInFile < writeBytes);
- final byte expected = getByte(posInFile);
-
- String s = "Unexpected byte " + actualBlkBytes[posInBlk]
- + ", expect " + expected
- + ". Block group index is " + group
- + ", stripe index is " + posInBlk / cellSize
- + ", cell index is " + blkIdxInGroup
- + ", byte index is " + posInBlk % cellSize;
- Assert.assertEquals(s, expected, actualBlkBytes[posInBlk]);
- }
- }
-
- verifyParity(lbs.getLocatedBlocks().get(group).getBlockSize(),
- cellSize, dataBlockBytes, parityBlockBytes);
- }
- }
-
- void verifyParity(final long size, final int cellSize,
- byte[][] dataBytes, byte[][] parityBytes) {
- verifyParity(conf, size, cellSize, dataBytes, parityBytes, -1);
- }
-
- static void verifyParity(Configuration conf, final long size,
- final int cellSize, byte[][] dataBytes,
- byte[][] parityBytes, int killedDnIndex) {
- // verify the parity blocks
- int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
- size, cellSize, dataBytes.length, dataBytes.length);
- final byte[][] expectedParityBytes = new byte[parityBytes.length][];
- for (int i = 0; i < parityBytes.length; i++) {
- expectedParityBytes[i] = new byte[parityBlkSize];
- }
- for (int i = 0; i < dataBytes.length; i++) {
- if (dataBytes[i] == null) {
- dataBytes[i] = new byte[dataBytes[0].length];
- } else if (dataBytes[i].length < dataBytes[0].length) {
- final byte[] tmp = dataBytes[i];
- dataBytes[i] = new byte[dataBytes[0].length];
- System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length);
- }
- }
- final RawErasureEncoder encoder =
- CodecUtil.createRSRawEncoder(conf,
- dataBytes.length, parityBytes.length);
- encoder.encode(dataBytes, expectedParityBytes);
- for (int i = 0; i < parityBytes.length; i++) {
- if (i != killedDnIndex) {
- Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + killedDnIndex,
- expectedParityBytes[i], parityBytes[i]);
- }
- }
+ StripedFileTestUtil.checkData(fs, testPath, writeBytes,
+ new int[]{}, 0);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
index 44a29e6..f6c2566 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
@@ -30,23 +31,18 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
@@ -74,6 +70,7 @@ public class TestDFSStripedOutputStreamWithFailure {
private static final int FLUSH_POS
= 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
+
static {
System.out.println("NUM_DATA_BLOCKS = " + NUM_DATA_BLOCKS);
System.out.println("NUM_PARITY_BLOCKS= " + NUM_PARITY_BLOCKS);
@@ -101,6 +98,32 @@ public class TestDFSStripedOutputStreamWithFailure {
return lengths;
}
+ private static final int[][] dnIndexSuite = {
+ {0, 1},
+ {0, 5},
+ {0, 6},
+ {0, 8},
+ {1, 5},
+ {1, 6},
+ {6, 8},
+ {0, 1, 2},
+ {3, 4, 5},
+ {0, 1, 6},
+ {0, 5, 6},
+ {0, 5, 8},
+ {0, 6, 7},
+ {5, 6, 7},
+ {6, 7, 8},
+ };
+
+ private int[] getKillPositions(int fileLen, int num) {
+ int[] positions = new int[num];
+ for (int i = 0; i < num; i++) {
+ positions[i] = fileLen * (i + 1) / (num + 1);
+ }
+ return positions;
+ }
+
private static final List<Integer> LENGTHS = newLengths();
static int getLength(int i) {
@@ -127,42 +150,26 @@ public class TestDFSStripedOutputStreamWithFailure {
}
}
- private static byte getByte(long pos) {
- return (byte)pos;
- }
-
private HdfsConfiguration newHdfsConfiguration() {
final HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
- conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
+ conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
return conf;
}
- void runTest(final int length) {
- final HdfsConfiguration conf = newHdfsConfiguration();
- for (int dn = 0; dn < 9; dn++) {
- try {
- setup(conf);
- runTest(length, dn, false, conf);
- } catch (Exception e) {
- final String err = "failed, dn=" + dn + ", length=" + length
- + StringUtils.stringifyException(e);
- LOG.error(err);
- Assert.fail(err);
- } finally {
- tearDown();
- }
- }
- }
-
@Test(timeout=240000)
public void testDatanodeFailure56() throws Exception {
runTest(getLength(56));
}
@Test(timeout=240000)
+ public void testMultipleDatanodeFailure56() throws Exception {
+ runTestWithMultipleFailure(getLength(56));
+ }
+
+ @Test(timeout=240000)
public void testBlockTokenExpired() throws Exception {
final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE);
final HdfsConfiguration conf = newHdfsConfiguration();
@@ -174,7 +181,7 @@ public class TestDFSStripedOutputStreamWithFailure {
for (int dn = 0; dn < 9; dn += 2) {
try {
setup(conf);
- runTest(length, dn, true, conf);
+ runTest(length, new int[]{length/2}, new int[]{dn}, true);
} catch (Exception e) {
LOG.error("failed, dn=" + dn + ", length=" + length);
throw e;
@@ -214,22 +221,8 @@ public class TestDFSStripedOutputStreamWithFailure {
Assert.fail("Failed to validate available dns against blkGroupSize");
} catch (IOException ioe) {
// expected
- GenericTestUtils.assertExceptionContains("Failed: the number of "
- + "remaining blocks = 5 < the number of data blocks = 6", ioe);
- DFSStripedOutputStream dfsout = (DFSStripedOutputStream) out
- .getWrappedStream();
-
- // get leading streamer and verify the last exception
- StripedDataStreamer datastreamer = dfsout.getStripedDataStreamer(0);
- try {
- datastreamer.getLastException().check(true);
- Assert.fail("Failed to validate available dns against blkGroupSize");
- } catch (IOException le) {
- GenericTestUtils.assertExceptionContains(
- "Failed to get datablocks number of nodes from"
- + " namenode: blockGroupSize= 9, blocks.length= "
- + numDatanodes, le);
- }
+ GenericTestUtils.assertExceptionContains("Failed to get 6 nodes from" +
+ " namenode: blockGroupSize= 9, blocks.length= 5", ioe);
}
} finally {
tearDown();
@@ -258,42 +251,73 @@ public class TestDFSStripedOutputStreamWithFailure {
int fileLength = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE - 1000;
final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
DFSTestUtil.writeFile(dfs, srcPath, new String(expected));
+ LOG.info("writing finished. Seek and read the file to verify.");
StripedFileTestUtil.verifySeek(dfs, srcPath, fileLength);
} finally {
tearDown();
}
}
- private void runTest(final int length, final int dnIndex,
- final boolean tokenExpire, final HdfsConfiguration conf) {
- try {
- runTest(length, length/2, dnIndex, tokenExpire, conf);
- } catch(Exception e) {
- LOG.info("FAILED", e);
- Assert.fail(StringUtils.stringifyException(e));
+ void runTest(final int length) {
+ final HdfsConfiguration conf = newHdfsConfiguration();
+ for (int dn = 0; dn < 9; dn++) {
+ try {
+ setup(conf);
+ runTest(length, new int[]{length/2}, new int[]{dn}, false);
+ } catch (Throwable e) {
+ final String err = "failed, dn=" + dn + ", length=" + length
+ + StringUtils.stringifyException(e);
+ LOG.error(err);
+ Assert.fail(err);
+ } finally {
+ tearDown();
+ }
}
}
- private void runTest(final int length, final int killPos,
- final int dnIndex, final boolean tokenExpire,
- final HdfsConfiguration conf) throws Exception {
- if (killPos <= FLUSH_POS) {
- LOG.warn("killPos=" + killPos + " <= FLUSH_POS=" + FLUSH_POS
- + ", length=" + length + ", dnIndex=" + dnIndex);
- return; //skip test
+ void runTestWithMultipleFailure(final int length) throws Exception {
+ final HdfsConfiguration conf = newHdfsConfiguration();
+ for(int i=0;i<dnIndexSuite.length;i++){
+ int[] dnIndex = dnIndexSuite[i];
+ int[] killPos = getKillPositions(length, dnIndex.length);
+ try {
+ setup(conf);
+ runTest(length, killPos, dnIndex, false);
+ } catch (Throwable e) {
+ final String err = "failed, killPos=" + Arrays.toString(killPos)
+ + ", dnIndex=" + Arrays.toString(dnIndex) + ", length=" + length;
+ LOG.error(err);
+ throw e;
+ } finally {
+ tearDown();
+ }
}
- Preconditions.checkArgument(length > killPos,
- "length=%s <= killPos=%s", length, killPos);
+ }
- // start a datanode now, will kill one later
- cluster.startDataNodes(conf, 1, true, null, null);
- cluster.waitActive();
+ /**
+ * runTest implementation
+ * @param length file length
+ * @param killPos killing positions in ascending order
+ * @param dnIndex DN index to kill when meets killing positions
+ * @param tokenExpire wait token to expire when kill a DN
+ * @throws Exception
+ */
+ private void runTest(final int length, final int[] killPos,
+ final int[] dnIndex, final boolean tokenExpire) throws Exception {
+ if (killPos[0] <= FLUSH_POS) {
+ LOG.warn("killPos=" + Arrays.toString(killPos) + " <= FLUSH_POS=" + FLUSH_POS
+ + ", length=" + length + ", dnIndex=" + Arrays.toString(dnIndex));
+ return; //skip test
+ }
+ Preconditions.checkArgument(length > killPos[0], "length=%s <= killPos=%s",
+ length, killPos);
+ Preconditions.checkArgument(killPos.length == dnIndex.length);
- final Path p = new Path(dir, "dn" + dnIndex + "len" + length + "kill" + killPos);
+ final Path p = new Path(dir, "dn" + Arrays.toString(dnIndex)
+ + "len" + length + "kill" + Arrays.toString(killPos));
final String fullPath = p.toString();
LOG.info("fullPath=" + fullPath);
-
if (tokenExpire) {
final NameNode nn = cluster.getNameNode();
final BlockManager bm = nn.getNamesystem().getBlockManager();
@@ -308,50 +332,56 @@ public class TestDFSStripedOutputStreamWithFailure {
final DFSStripedOutputStream stripedOut
= (DFSStripedOutputStream)out.getWrappedStream();
- long oldGS = -1;
- boolean killed = false;
+ long firstGS = -1; // first GS of this block group which never proceeds blockRecovery
+ long oldGS = -1; // the old GS before bumping
+ int numKilled=0;
for(; pos.get() < length; ) {
final int i = pos.getAndIncrement();
- if (i == killPos) {
+ if (numKilled < killPos.length && i == killPos[numKilled]) {
+ assertTrue(firstGS != -1);
final long gs = getGenerationStamp(stripedOut);
- Assert.assertTrue(oldGS != -1);
- Assert.assertEquals(oldGS, gs);
+ if (numKilled == 0) {
+ assertEquals(firstGS, gs);
+ } else {
+ //TODO: implement hflush/hsync and verify gs strict greater than oldGS
+ assertTrue(gs >= oldGS);
+ }
+ oldGS = gs;
if (tokenExpire) {
DFSTestUtil.flushInternal(stripedOut);
waitTokenExpires(out);
}
- killDatanode(cluster, stripedOut, dnIndex, pos);
- killed = true;
+ killDatanode(cluster, stripedOut, dnIndex[numKilled], pos);
+ numKilled++;
}
write(out, i);
- if (i == FLUSH_POS) {
- oldGS = getGenerationStamp(stripedOut);
+ if (i % BLOCK_GROUP_SIZE == FLUSH_POS) {
+ firstGS = getGenerationStamp(stripedOut);
+ oldGS = firstGS;
}
}
out.close();
+ assertEquals(dnIndex.length, numKilled);
short expectedReported = StripedFileTestUtil.getRealTotalBlockNum(length);
- if (length > dnIndex * CELL_SIZE || dnIndex >= NUM_DATA_BLOCKS) {
- expectedReported--;
+ for(int idx :dnIndex) {
+ if (length > idx * CELL_SIZE || idx >= NUM_DATA_BLOCKS) {
+ expectedReported--;
+ }
}
DFSTestUtil.waitReplication(dfs, p, expectedReported);
- Assert.assertTrue(killed);
-
- // check file length
- final FileStatus status = dfs.getFileStatus(p);
- Assert.assertEquals(length, status.getLen());
-
- checkData(dfs, fullPath, length, dnIndex, oldGS);
+ cluster.triggerBlockReports();
+ StripedFileTestUtil.checkData(dfs, p, length, dnIndex, oldGS);
}
static void write(FSDataOutputStream out, int i) throws IOException {
try {
- out.write(getByte(i));
+ out.write(StripedFileTestUtil.getByte(i));
} catch(IOException ioe) {
throw new IOException("Failed at i=" + i, ioe);
}
@@ -359,10 +389,10 @@ public class TestDFSStripedOutputStreamWithFailure {
static long getGenerationStamp(DFSStripedOutputStream out)
throws IOException {
+ DFSTestUtil.flushBuffer(out);
final long gs = DFSTestUtil.flushInternal(out).getGenerationStamp();
LOG.info("getGenerationStamp returns " + gs);
return gs;
-
}
static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
@@ -399,106 +429,6 @@ public class TestDFSStripedOutputStreamWithFailure {
cluster.stopDataNode(datanode.getXferAddr());
}
- static void checkData(DistributedFileSystem dfs, String src, int length,
- int killedDnIndex, long oldGS) throws IOException {
- List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
- LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(src, 0L);
- final int expectedNumGroup = (length - 1)/BLOCK_GROUP_SIZE + 1;
- Assert.assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size());
-
- for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
- Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
-
- final long gs = firstBlock.getBlock().getGenerationStamp();
- final String s = "gs=" + gs + ", oldGS=" + oldGS;
- LOG.info(s);
- Assert.assertTrue(s, gs >= oldGS);
-
- LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
- (LocatedStripedBlock) firstBlock,
- CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
- blockGroupList.add(Arrays.asList(blocks));
- }
-
- // test each block group
- for (int group = 0; group < blockGroupList.size(); group++) {
- final boolean isLastGroup = group == blockGroupList.size() - 1;
- final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE
- : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE;
- final int numCellInGroup = (groupSize - 1)/CELL_SIZE + 1;
- final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS;
- final int lastCellSize = groupSize - (numCellInGroup - 1)*CELL_SIZE;
-
- //get the data of this block
- List<LocatedBlock> blockList = blockGroupList.get(group);
- byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][];
- byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][];
-
- // for each block, use BlockReader to read data
- for (int i = 0; i < blockList.size(); i++) {
- final int j = i >= NUM_DATA_BLOCKS? 0: i;
- final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS
- + (j <= lastCellIndex? 1: 0);
- final int blockSize = numCellInBlock*CELL_SIZE
- + (isLastGroup && j == lastCellIndex? lastCellSize - CELL_SIZE: 0);
-
- final byte[] blockBytes = new byte[blockSize];
- if (i < NUM_DATA_BLOCKS) {
- dataBlockBytes[i] = blockBytes;
- } else {
- parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes;
- }
-
- final LocatedBlock lb = blockList.get(i);
- LOG.info("i,j=" + i + ", " + j + ", numCellInBlock=" + numCellInBlock
- + ", blockSize=" + blockSize + ", lb=" + lb);
- if (lb == null) {
- continue;
- }
- final ExtendedBlock block = lb.getBlock();
- Assert.assertEquals(blockSize, block.getNumBytes());
-
-
- if (block.getNumBytes() == 0) {
- continue;
- }
-
- if (i != killedDnIndex) {
- final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
- dfs, lb, 0, block.getNumBytes());
- blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
- blockReader.close();
- }
- }
-
- // check data
- final int groupPosInFile = group*BLOCK_GROUP_SIZE;
- for (int i = 0; i < dataBlockBytes.length; i++) {
- final byte[] actual = dataBlockBytes[i];
- for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) {
- final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(
- CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile;
- Assert.assertTrue(posInFile < length);
- final byte expected = getByte(posInFile);
-
- if (i == killedDnIndex) {
- actual[posInBlk] = expected;
- } else {
- String s = "expected=" + expected + " but actual=" + actual[posInBlk]
- + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk
- + ". group=" + group + ", i=" + i;
- Assert.assertEquals(s, expected, actual[posInBlk]);
- }
- }
- }
-
- // check parity
- TestDFSStripedOutputStream.verifyParity(dfs.getConf(),
- lbs.getLocatedBlocks().get(group).getBlockSize(),
- CELL_SIZE, dataBlockBytes, parityBlockBytes,
- killedDnIndex - dataBlockBytes.length);
- }
- }
private void waitTokenExpires(FSDataOutputStream out) throws IOException {
Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
index c0dca4e..764527d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -39,6 +41,12 @@ public class TestWriteStripedFileWithFailure {
private static MiniDFSCluster cluster;
private static FileSystem fs;
private static Configuration conf = new HdfsConfiguration();
+
+ static {
+ GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
+ }
+
private final short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
private final short parityBlocks = StripedFileTestUtil.NUM_PARITY_BLOCKS;
private final int smallFileLength = blockSize * dataBlocks - 123;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 124bf80..ef31527 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -1745,7 +1745,7 @@ public class TestBalancer {
// verify locations of striped blocks
LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
- DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+ StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
// add one datanode
String newRack = "/rack" + (++numOfRacks);
@@ -1761,7 +1761,7 @@ public class TestBalancer {
// verify locations of striped blocks
locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
- DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+ StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
} finally {
cluster.shutdown();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index 3a9748f..7cf5656 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -488,7 +488,7 @@ public class TestMover {
Assert.assertEquals(StorageType.DISK, type);
}
}
- DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+ StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
dataBlocks + parityBlocks);
// start 5 more datanodes
@@ -523,7 +523,7 @@ public class TestMover {
Assert.assertEquals(StorageType.ARCHIVE, type);
}
}
- DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+ StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
dataBlocks + parityBlocks);
}finally{
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
index 64d33a4..abcdbc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
@@ -42,7 +41,6 @@ import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
public class TestAddOverReplicatedStripedBlocks {
@@ -64,6 +62,7 @@ public class TestAddOverReplicatedStripedBlocks {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
// disable block recovery
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
SimulatedFSDataset.setFactory(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.waitActive();
@@ -118,7 +117,7 @@ public class TestAddOverReplicatedStripedBlocks {
// verify that all internal blocks exists
lbs = cluster.getNameNodeRpc().getBlockLocations(
filePath.toString(), 0, fileLen);
- DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
+ StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
}
@Test
@@ -162,7 +161,7 @@ public class TestAddOverReplicatedStripedBlocks {
// verify that all internal blocks exists
lbs = cluster.getNameNodeRpc().getBlockLocations(
filePath.toString(), 0, fileLen);
- DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
+ StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
}
@Test
@@ -216,7 +215,7 @@ public class TestAddOverReplicatedStripedBlocks {
// verify that all internal blocks exists
lbs = cluster.getNameNodeRpc().getBlockLocations(
filePath.toString(), 0, fileLen);
- DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
+ StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
}
@Test
@@ -248,6 +247,7 @@ public class TestAddOverReplicatedStripedBlocks {
// update blocksMap
cluster.triggerBlockReports();
+ Thread.sleep(2000);
// add to invalidates
cluster.triggerHeartbeats();
// datanode delete block
@@ -259,7 +259,7 @@ public class TestAddOverReplicatedStripedBlocks {
// we are left GROUP_SIZE - 1 blocks.
lbs = cluster.getNameNodeRpc().getBlockLocations(
filePath.toString(), 0, fileLen);
- DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
+ StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
index c27ead5..735f84d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
@@ -736,7 +737,13 @@ public class TestRetryCacheWithHA {
DatanodeInfo[] newNodes = new DatanodeInfo[2];
newNodes[0] = nodes[0];
newNodes[1] = nodes[1];
- String[] storageIDs = {"s0", "s1"};
+ final DatanodeManager dm = cluster.getNamesystem(0).getBlockManager()
+ .getDatanodeManager();
+ final String storageID1 = dm.getDatanode(newNodes[0]).getStorageInfos()[0]
+ .getStorageID();
+ final String storageID2 = dm.getDatanode(newNodes[1]).getStorageInfos()[0]
+ .getStorageID();
+ String[] storageIDs = {storageID1, storageID2};
client.getNamenode().updatePipeline(client.getClientName(), oldBlock,
newBlock, newNodes, storageIDs);