You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/09/28 23:41:22 UTC
[1/2] hadoop git commit: HDFS-9040. Erasure coding: coordinate data
streamers in DFSStripedOutputStream. Contributed by Jing Zhao and Walter Su.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7285 c09dc258a -> 6419900ac
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
index 8d4a0cf..12453fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -20,23 +20,35 @@ package org.apache.hadoop.hdfs;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.junit.Assert;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Random;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.junit.Assert.assertEquals;
+
public class StripedFileTestUtil {
public static final Log LOG = LogFactory.getLog(StripedFileTestUtil.class);
/*
@@ -50,8 +62,8 @@ public class StripedFileTestUtil {
static final int stripesPerBlock = 4;
static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock;
static final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2;
+ static final int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS;
- static final Random random = new Random();
static byte[] generateBytes(int cnt) {
byte[] bytes = new byte[cnt];
@@ -61,6 +73,11 @@ public class StripedFileTestUtil {
return bytes;
}
+ static byte getByte(long pos) {
+ final int mod = 29;
+ return (byte) (pos % mod + 1);
+ }
+
static int readAll(FSDataInputStream in, byte[] buf) throws IOException {
int readLen = 0;
int ret;
@@ -71,15 +88,10 @@ public class StripedFileTestUtil {
return readLen;
}
- static byte getByte(long pos) {
- final int mod = 29;
- return (byte) (pos % mod + 1);
- }
-
static void verifyLength(FileSystem fs, Path srcPath, int fileLength)
throws IOException {
FileStatus status = fs.getFileStatus(srcPath);
- Assert.assertEquals("File length should be the same", fileLength, status.getLen());
+ assertEquals("File length should be the same", fileLength, status.getLen());
}
static void verifyPread(FileSystem fs, Path srcPath, int fileLength,
@@ -101,9 +113,7 @@ public class StripedFileTestUtil {
offset += target;
}
for (int i = 0; i < fileLength - startOffset; i++) {
- Assert.assertEquals("Byte at " + (startOffset + i) + " is different, "
- + "the startOffset is " + startOffset,
- expected[startOffset + i], result[i]);
+ assertEquals("Byte at " + (startOffset + i) + " is different, " + "the startOffset is " + startOffset, expected[startOffset + i], result[i]);
}
}
}
@@ -119,8 +129,7 @@ public class StripedFileTestUtil {
System.arraycopy(buf, 0, result, readLen, ret);
readLen += ret;
}
- Assert.assertEquals("The length of file should be the same to write size",
- fileLength, readLen);
+ assertEquals("The length of file should be the same to write size", fileLength, readLen);
Assert.assertArrayEquals(expected, result);
}
}
@@ -137,8 +146,7 @@ public class StripedFileTestUtil {
result.put(buf);
buf.clear();
}
- Assert.assertEquals("The length of file should be the same to write size",
- fileLength, readLen);
+ assertEquals("The length of file should be the same to write size", fileLength, readLen);
Assert.assertArrayEquals(expected, result.array());
}
}
@@ -199,10 +207,9 @@ public class StripedFileTestUtil {
fsdis.seek(pos);
byte[] buf = new byte[writeBytes];
int readLen = StripedFileTestUtil.readAll(fsdis, buf);
- Assert.assertEquals(readLen, writeBytes - pos);
+ assertEquals(readLen, writeBytes - pos);
for (int i = 0; i < readLen; i++) {
- Assert.assertEquals("Byte at " + i + " should be the same",
- StripedFileTestUtil.getByte(pos + i), buf[i]);
+ assertEquals("Byte at " + i + " should be the same", StripedFileTestUtil.getByte(pos + i), buf[i]);
}
}
@@ -210,6 +217,7 @@ public class StripedFileTestUtil {
final int dnIndex, final AtomicInteger pos) {
final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
final DatanodeInfo datanode = getDatanodes(s);
+ assert datanode != null;
LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
cluster.stopDataNode(datanode.getXferAddr());
}
@@ -218,7 +226,7 @@ public class StripedFileTestUtil {
for(;;) {
final DatanodeInfo[] datanodes = streamer.getNodes();
if (datanodes != null) {
- Assert.assertEquals(1, datanodes.length);
+ assertEquals(1, datanodes.length);
Assert.assertNotNull(datanodes[0]);
return datanodes[0];
}
@@ -287,7 +295,6 @@ public class StripedFileTestUtil {
* @param min minimum of the range
* @param max maximum of the range
* @param n number to be generated
- * @return
*/
public static int[] randomArray(int min, int max, int n){
if (n > (max - min + 1) || max < min || min < 0 || max < 0) {
@@ -315,4 +322,170 @@ public class StripedFileTestUtil {
}
return result;
}
+
+ /**
+ * Verify that blocks in striped block group are on different nodes, and every
+ * internal blocks exists.
+ */
+ public static void verifyLocatedStripedBlocks(LocatedBlocks lbs, int groupSize) {
+ for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+ assert lb instanceof LocatedStripedBlock;
+ HashSet<DatanodeInfo> locs = new HashSet<>();
+ Collections.addAll(locs, lb.getLocations());
+ assertEquals(groupSize, lb.getLocations().length);
+ assertEquals(groupSize, locs.size());
+
+ // verify that every internal blocks exists
+ int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices();
+ assertEquals(groupSize, blockIndices.length);
+ HashSet<Integer> found = new HashSet<>();
+ for (int index : blockIndices) {
+ assert index >=0;
+ found.add(index);
+ }
+ assertEquals(groupSize, found.size());
+ }
+ }
+
+ static void checkData(DistributedFileSystem dfs, Path srcPath, int length,
+ int[] killedDnIndex, long oldGS) throws IOException {
+
+ StripedFileTestUtil.verifyLength(dfs, srcPath, length);
+ Arrays.sort(killedDnIndex);
+ List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
+ LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(srcPath.toString(), 0L,
+ Long.MAX_VALUE);
+ int expectedNumGroup = 0;
+ if (length > 0) {
+ expectedNumGroup = (length - 1) / BLOCK_GROUP_SIZE + 1;
+ }
+ assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size());
+
+ for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
+ Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
+
+ final long gs = firstBlock.getBlock().getGenerationStamp();
+ final String s = "gs=" + gs + ", oldGS=" + oldGS;
+ LOG.info(s);
+ Assert.assertTrue(s, gs >= oldGS);
+
+ LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
+ (LocatedStripedBlock) firstBlock, BLOCK_STRIPED_CELL_SIZE,
+ NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
+ blockGroupList.add(Arrays.asList(blocks));
+ }
+
+ // test each block group
+ for (int group = 0; group < blockGroupList.size(); group++) {
+ final boolean isLastGroup = group == blockGroupList.size() - 1;
+ final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE
+ : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE;
+ final int numCellInGroup = (groupSize - 1)/BLOCK_STRIPED_CELL_SIZE + 1;
+ final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS;
+ final int lastCellSize = groupSize - (numCellInGroup - 1)*BLOCK_STRIPED_CELL_SIZE;
+
+ //get the data of this block
+ List<LocatedBlock> blockList = blockGroupList.get(group);
+ byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][];
+ byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][];
+
+ // for each block, use BlockReader to read data
+ for (int i = 0; i < blockList.size(); i++) {
+ final int j = i >= NUM_DATA_BLOCKS? 0: i;
+ final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS
+ + (j <= lastCellIndex? 1: 0);
+ final int blockSize = numCellInBlock*BLOCK_STRIPED_CELL_SIZE
+ + (isLastGroup && j == lastCellIndex? lastCellSize - BLOCK_STRIPED_CELL_SIZE: 0);
+
+ final byte[] blockBytes = new byte[blockSize];
+ if (i < NUM_DATA_BLOCKS) {
+ dataBlockBytes[i] = blockBytes;
+ } else {
+ parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes;
+ }
+
+ final LocatedBlock lb = blockList.get(i);
+ LOG.info("i,j=" + i + ", " + j + ", numCellInBlock=" + numCellInBlock
+ + ", blockSize=" + blockSize + ", lb=" + lb);
+ if (lb == null) {
+ continue;
+ }
+ final ExtendedBlock block = lb.getBlock();
+ assertEquals(blockSize, block.getNumBytes());
+
+ if (block.getNumBytes() == 0) {
+ continue;
+ }
+
+ if (Arrays.binarySearch(killedDnIndex, i) < 0) {
+ final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
+ dfs, lb, 0, block.getNumBytes());
+ blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
+ blockReader.close();
+ }
+ }
+
+ // check data
+ final int groupPosInFile = group*BLOCK_GROUP_SIZE;
+ for (int i = 0; i < dataBlockBytes.length; i++) {
+ boolean killed = false;
+ if (Arrays.binarySearch(killedDnIndex, i) >= 0){
+ killed = true;
+ }
+ final byte[] actual = dataBlockBytes[i];
+ for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) {
+ final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(
+ BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile;
+ Assert.assertTrue(posInFile < length);
+ final byte expected = getByte(posInFile);
+
+ if (killed) {
+ actual[posInBlk] = expected;
+ } else {
+ if(expected != actual[posInBlk]){
+ String s = "expected=" + expected + " but actual=" + actual[posInBlk]
+ + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk
+ + ". group=" + group + ", i=" + i;
+ Assert.fail(s);
+ }
+ }
+ }
+ }
+
+ // check parity
+ verifyParityBlocks(dfs.getConf(), lbs.getLocatedBlocks().get(group)
+ .getBlockSize(),
+ BLOCK_STRIPED_CELL_SIZE, dataBlockBytes, parityBlockBytes, killedDnIndex);
+ }
+ }
+
+ static void verifyParityBlocks(Configuration conf, final long size, final int cellSize,
+ byte[][] dataBytes, byte[][] parityBytes, int[] killedDnIndex) {
+ Arrays.sort(killedDnIndex);
+ // verify the parity blocks
+ int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
+ size, cellSize, dataBytes.length, dataBytes.length);
+ final byte[][] expectedParityBytes = new byte[parityBytes.length][];
+ for (int i = 0; i < parityBytes.length; i++) {
+ expectedParityBytes[i] = new byte[parityBlkSize];
+ }
+ for (int i = 0; i < dataBytes.length; i++) {
+ if (dataBytes[i] == null) {
+ dataBytes[i] = new byte[dataBytes[0].length];
+ } else if (dataBytes[i].length < dataBytes[0].length) {
+ final byte[] tmp = dataBytes[i];
+ dataBytes[i] = new byte[dataBytes[0].length];
+ System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length);
+ }
+ }
+ final RawErasureEncoder encoder =
+ CodecUtil.createRSRawEncoder(conf, dataBytes.length, parityBytes.length);
+ encoder.encode(dataBytes, expectedParityBytes);
+ for (int i = 0; i < parityBytes.length; i++) {
+ if (Arrays.binarySearch(killedDnIndex, dataBytes.length + i) < 0){
+ Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + Arrays.toString(killedDnIndex),
+ expectedParityBytes[i], parityBytes[i]);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index 0641e8e..d78e88b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -18,26 +18,14 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
-import org.apache.hadoop.io.erasurecode.CodecUtil;
-import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -154,141 +142,15 @@ public class TestDFSStripedOutputStream {
+ cellSize + 123);
}
- private byte[] generateBytes(int cnt) {
- byte[] bytes = new byte[cnt];
- for (int i = 0; i < cnt; i++) {
- bytes[i] = getByte(i);
- }
- return bytes;
- }
-
- private byte getByte(long pos) {
- int mod = 29;
- return (byte) (pos % mod + 1);
- }
-
private void testOneFile(String src, int writeBytes) throws Exception {
src += "_" + writeBytes;
Path testPath = new Path(src);
- byte[] bytes = generateBytes(writeBytes);
+ byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, testPath, new String(bytes));
StripedFileTestUtil.waitBlockGroupsReported(fs, src);
- // check file length
- FileStatus status = fs.getFileStatus(testPath);
- Assert.assertEquals(writeBytes, status.getLen());
-
- checkData(src, writeBytes);
- }
-
- void checkData(String src, int writeBytes) throws IOException {
- List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
- LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
-
- for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
- Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
- LocatedBlock[] blocks = StripedBlockUtil.
- parseStripedBlockGroup((LocatedStripedBlock) firstBlock,
- cellSize, dataBlocks, parityBlocks);
- List<LocatedBlock> oneGroup = Arrays.asList(blocks);
- blockGroupList.add(oneGroup);
- }
-
- // test each block group
- for (int group = 0; group < blockGroupList.size(); group++) {
- //get the data of this block
- List<LocatedBlock> blockList = blockGroupList.get(group);
- byte[][] dataBlockBytes = new byte[dataBlocks][];
- byte[][] parityBlockBytes = new byte[parityBlocks][];
-
- // for each block, use BlockReader to read data
- for (int i = 0; i < blockList.size(); i++) {
- LocatedBlock lblock = blockList.get(i);
- if (lblock == null) {
- continue;
- }
- ExtendedBlock block = lblock.getBlock();
- byte[] blockBytes = new byte[(int)block.getNumBytes()];
- if (i < dataBlocks) {
- dataBlockBytes[i] = blockBytes;
- } else {
- parityBlockBytes[i - dataBlocks] = blockBytes;
- }
-
- if (block.getNumBytes() == 0) {
- continue;
- }
-
- final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
- fs, lblock, 0, block.getNumBytes());
- blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
- blockReader.close();
- }
-
- // check if we write the data correctly
- for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length;
- blkIdxInGroup++) {
- final byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup];
- if (actualBlkBytes == null) {
- continue;
- }
- for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) {
- // calculate the position of this byte in the file
- long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize,
- dataBlocks, posInBlk, blkIdxInGroup) +
- group * blockSize * dataBlocks;
- Assert.assertTrue(posInFile < writeBytes);
- final byte expected = getByte(posInFile);
-
- String s = "Unexpected byte " + actualBlkBytes[posInBlk]
- + ", expect " + expected
- + ". Block group index is " + group
- + ", stripe index is " + posInBlk / cellSize
- + ", cell index is " + blkIdxInGroup
- + ", byte index is " + posInBlk % cellSize;
- Assert.assertEquals(s, expected, actualBlkBytes[posInBlk]);
- }
- }
-
- verifyParity(lbs.getLocatedBlocks().get(group).getBlockSize(),
- cellSize, dataBlockBytes, parityBlockBytes);
- }
- }
-
- void verifyParity(final long size, final int cellSize,
- byte[][] dataBytes, byte[][] parityBytes) {
- verifyParity(conf, size, cellSize, dataBytes, parityBytes, -1);
- }
-
- static void verifyParity(Configuration conf, final long size,
- final int cellSize, byte[][] dataBytes,
- byte[][] parityBytes, int killedDnIndex) {
- // verify the parity blocks
- int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
- size, cellSize, dataBytes.length, dataBytes.length);
- final byte[][] expectedParityBytes = new byte[parityBytes.length][];
- for (int i = 0; i < parityBytes.length; i++) {
- expectedParityBytes[i] = new byte[parityBlkSize];
- }
- for (int i = 0; i < dataBytes.length; i++) {
- if (dataBytes[i] == null) {
- dataBytes[i] = new byte[dataBytes[0].length];
- } else if (dataBytes[i].length < dataBytes[0].length) {
- final byte[] tmp = dataBytes[i];
- dataBytes[i] = new byte[dataBytes[0].length];
- System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length);
- }
- }
- final RawErasureEncoder encoder =
- CodecUtil.createRSRawEncoder(conf,
- dataBytes.length, parityBytes.length);
- encoder.encode(dataBytes, expectedParityBytes);
- for (int i = 0; i < parityBytes.length; i++) {
- if (i != killedDnIndex) {
- Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + killedDnIndex,
- expectedParityBytes[i], parityBytes[i]);
- }
- }
+ StripedFileTestUtil.checkData(fs, testPath, writeBytes,
+ new int[]{}, 0);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
index 44a29e6..f6c2566 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
@@ -30,23 +31,18 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
@@ -74,6 +70,7 @@ public class TestDFSStripedOutputStreamWithFailure {
private static final int FLUSH_POS
= 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
+
static {
System.out.println("NUM_DATA_BLOCKS = " + NUM_DATA_BLOCKS);
System.out.println("NUM_PARITY_BLOCKS= " + NUM_PARITY_BLOCKS);
@@ -101,6 +98,32 @@ public class TestDFSStripedOutputStreamWithFailure {
return lengths;
}
+ private static final int[][] dnIndexSuite = {
+ {0, 1},
+ {0, 5},
+ {0, 6},
+ {0, 8},
+ {1, 5},
+ {1, 6},
+ {6, 8},
+ {0, 1, 2},
+ {3, 4, 5},
+ {0, 1, 6},
+ {0, 5, 6},
+ {0, 5, 8},
+ {0, 6, 7},
+ {5, 6, 7},
+ {6, 7, 8},
+ };
+
+ private int[] getKillPositions(int fileLen, int num) {
+ int[] positions = new int[num];
+ for (int i = 0; i < num; i++) {
+ positions[i] = fileLen * (i + 1) / (num + 1);
+ }
+ return positions;
+ }
+
private static final List<Integer> LENGTHS = newLengths();
static int getLength(int i) {
@@ -127,42 +150,26 @@ public class TestDFSStripedOutputStreamWithFailure {
}
}
- private static byte getByte(long pos) {
- return (byte)pos;
- }
-
private HdfsConfiguration newHdfsConfiguration() {
final HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
- conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
+ conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
return conf;
}
- void runTest(final int length) {
- final HdfsConfiguration conf = newHdfsConfiguration();
- for (int dn = 0; dn < 9; dn++) {
- try {
- setup(conf);
- runTest(length, dn, false, conf);
- } catch (Exception e) {
- final String err = "failed, dn=" + dn + ", length=" + length
- + StringUtils.stringifyException(e);
- LOG.error(err);
- Assert.fail(err);
- } finally {
- tearDown();
- }
- }
- }
-
@Test(timeout=240000)
public void testDatanodeFailure56() throws Exception {
runTest(getLength(56));
}
@Test(timeout=240000)
+ public void testMultipleDatanodeFailure56() throws Exception {
+ runTestWithMultipleFailure(getLength(56));
+ }
+
+ @Test(timeout=240000)
public void testBlockTokenExpired() throws Exception {
final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE);
final HdfsConfiguration conf = newHdfsConfiguration();
@@ -174,7 +181,7 @@ public class TestDFSStripedOutputStreamWithFailure {
for (int dn = 0; dn < 9; dn += 2) {
try {
setup(conf);
- runTest(length, dn, true, conf);
+ runTest(length, new int[]{length/2}, new int[]{dn}, true);
} catch (Exception e) {
LOG.error("failed, dn=" + dn + ", length=" + length);
throw e;
@@ -214,22 +221,8 @@ public class TestDFSStripedOutputStreamWithFailure {
Assert.fail("Failed to validate available dns against blkGroupSize");
} catch (IOException ioe) {
// expected
- GenericTestUtils.assertExceptionContains("Failed: the number of "
- + "remaining blocks = 5 < the number of data blocks = 6", ioe);
- DFSStripedOutputStream dfsout = (DFSStripedOutputStream) out
- .getWrappedStream();
-
- // get leading streamer and verify the last exception
- StripedDataStreamer datastreamer = dfsout.getStripedDataStreamer(0);
- try {
- datastreamer.getLastException().check(true);
- Assert.fail("Failed to validate available dns against blkGroupSize");
- } catch (IOException le) {
- GenericTestUtils.assertExceptionContains(
- "Failed to get datablocks number of nodes from"
- + " namenode: blockGroupSize= 9, blocks.length= "
- + numDatanodes, le);
- }
+ GenericTestUtils.assertExceptionContains("Failed to get 6 nodes from" +
+ " namenode: blockGroupSize= 9, blocks.length= 5", ioe);
}
} finally {
tearDown();
@@ -258,42 +251,73 @@ public class TestDFSStripedOutputStreamWithFailure {
int fileLength = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE - 1000;
final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
DFSTestUtil.writeFile(dfs, srcPath, new String(expected));
+ LOG.info("writing finished. Seek and read the file to verify.");
StripedFileTestUtil.verifySeek(dfs, srcPath, fileLength);
} finally {
tearDown();
}
}
- private void runTest(final int length, final int dnIndex,
- final boolean tokenExpire, final HdfsConfiguration conf) {
- try {
- runTest(length, length/2, dnIndex, tokenExpire, conf);
- } catch(Exception e) {
- LOG.info("FAILED", e);
- Assert.fail(StringUtils.stringifyException(e));
+ void runTest(final int length) {
+ final HdfsConfiguration conf = newHdfsConfiguration();
+ for (int dn = 0; dn < 9; dn++) {
+ try {
+ setup(conf);
+ runTest(length, new int[]{length/2}, new int[]{dn}, false);
+ } catch (Throwable e) {
+ final String err = "failed, dn=" + dn + ", length=" + length
+ + StringUtils.stringifyException(e);
+ LOG.error(err);
+ Assert.fail(err);
+ } finally {
+ tearDown();
+ }
}
}
- private void runTest(final int length, final int killPos,
- final int dnIndex, final boolean tokenExpire,
- final HdfsConfiguration conf) throws Exception {
- if (killPos <= FLUSH_POS) {
- LOG.warn("killPos=" + killPos + " <= FLUSH_POS=" + FLUSH_POS
- + ", length=" + length + ", dnIndex=" + dnIndex);
- return; //skip test
+ void runTestWithMultipleFailure(final int length) throws Exception {
+ final HdfsConfiguration conf = newHdfsConfiguration();
+ for(int i=0;i<dnIndexSuite.length;i++){
+ int[] dnIndex = dnIndexSuite[i];
+ int[] killPos = getKillPositions(length, dnIndex.length);
+ try {
+ setup(conf);
+ runTest(length, killPos, dnIndex, false);
+ } catch (Throwable e) {
+ final String err = "failed, killPos=" + Arrays.toString(killPos)
+ + ", dnIndex=" + Arrays.toString(dnIndex) + ", length=" + length;
+ LOG.error(err);
+ throw e;
+ } finally {
+ tearDown();
+ }
}
- Preconditions.checkArgument(length > killPos,
- "length=%s <= killPos=%s", length, killPos);
+ }
- // start a datanode now, will kill one later
- cluster.startDataNodes(conf, 1, true, null, null);
- cluster.waitActive();
+ /**
+ * runTest implementation
+ * @param length file length
+ * @param killPos killing positions in ascending order
+ * @param dnIndex DN index to kill when meets killing positions
+ * @param tokenExpire wait token to expire when kill a DN
+ * @throws Exception
+ */
+ private void runTest(final int length, final int[] killPos,
+ final int[] dnIndex, final boolean tokenExpire) throws Exception {
+ if (killPos[0] <= FLUSH_POS) {
+ LOG.warn("killPos=" + Arrays.toString(killPos) + " <= FLUSH_POS=" + FLUSH_POS
+ + ", length=" + length + ", dnIndex=" + Arrays.toString(dnIndex));
+ return; //skip test
+ }
+ Preconditions.checkArgument(length > killPos[0], "length=%s <= killPos=%s",
+ length, killPos);
+ Preconditions.checkArgument(killPos.length == dnIndex.length);
- final Path p = new Path(dir, "dn" + dnIndex + "len" + length + "kill" + killPos);
+ final Path p = new Path(dir, "dn" + Arrays.toString(dnIndex)
+ + "len" + length + "kill" + Arrays.toString(killPos));
final String fullPath = p.toString();
LOG.info("fullPath=" + fullPath);
-
if (tokenExpire) {
final NameNode nn = cluster.getNameNode();
final BlockManager bm = nn.getNamesystem().getBlockManager();
@@ -308,50 +332,56 @@ public class TestDFSStripedOutputStreamWithFailure {
final DFSStripedOutputStream stripedOut
= (DFSStripedOutputStream)out.getWrappedStream();
- long oldGS = -1;
- boolean killed = false;
+ long firstGS = -1; // first GS of this block group which never proceeds blockRecovery
+ long oldGS = -1; // the old GS before bumping
+ int numKilled=0;
for(; pos.get() < length; ) {
final int i = pos.getAndIncrement();
- if (i == killPos) {
+ if (numKilled < killPos.length && i == killPos[numKilled]) {
+ assertTrue(firstGS != -1);
final long gs = getGenerationStamp(stripedOut);
- Assert.assertTrue(oldGS != -1);
- Assert.assertEquals(oldGS, gs);
+ if (numKilled == 0) {
+ assertEquals(firstGS, gs);
+ } else {
+ //TODO: implement hflush/hsync and verify gs strict greater than oldGS
+ assertTrue(gs >= oldGS);
+ }
+ oldGS = gs;
if (tokenExpire) {
DFSTestUtil.flushInternal(stripedOut);
waitTokenExpires(out);
}
- killDatanode(cluster, stripedOut, dnIndex, pos);
- killed = true;
+ killDatanode(cluster, stripedOut, dnIndex[numKilled], pos);
+ numKilled++;
}
write(out, i);
- if (i == FLUSH_POS) {
- oldGS = getGenerationStamp(stripedOut);
+ if (i % BLOCK_GROUP_SIZE == FLUSH_POS) {
+ firstGS = getGenerationStamp(stripedOut);
+ oldGS = firstGS;
}
}
out.close();
+ assertEquals(dnIndex.length, numKilled);
short expectedReported = StripedFileTestUtil.getRealTotalBlockNum(length);
- if (length > dnIndex * CELL_SIZE || dnIndex >= NUM_DATA_BLOCKS) {
- expectedReported--;
+ for(int idx :dnIndex) {
+ if (length > idx * CELL_SIZE || idx >= NUM_DATA_BLOCKS) {
+ expectedReported--;
+ }
}
DFSTestUtil.waitReplication(dfs, p, expectedReported);
- Assert.assertTrue(killed);
-
- // check file length
- final FileStatus status = dfs.getFileStatus(p);
- Assert.assertEquals(length, status.getLen());
-
- checkData(dfs, fullPath, length, dnIndex, oldGS);
+ cluster.triggerBlockReports();
+ StripedFileTestUtil.checkData(dfs, p, length, dnIndex, oldGS);
}
static void write(FSDataOutputStream out, int i) throws IOException {
try {
- out.write(getByte(i));
+ out.write(StripedFileTestUtil.getByte(i));
} catch(IOException ioe) {
throw new IOException("Failed at i=" + i, ioe);
}
@@ -359,10 +389,10 @@ public class TestDFSStripedOutputStreamWithFailure {
static long getGenerationStamp(DFSStripedOutputStream out)
throws IOException {
+ DFSTestUtil.flushBuffer(out);
final long gs = DFSTestUtil.flushInternal(out).getGenerationStamp();
LOG.info("getGenerationStamp returns " + gs);
return gs;
-
}
static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
@@ -399,106 +429,6 @@ public class TestDFSStripedOutputStreamWithFailure {
cluster.stopDataNode(datanode.getXferAddr());
}
- static void checkData(DistributedFileSystem dfs, String src, int length,
- int killedDnIndex, long oldGS) throws IOException {
- List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
- LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(src, 0L);
- final int expectedNumGroup = (length - 1)/BLOCK_GROUP_SIZE + 1;
- Assert.assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size());
-
- for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
- Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
-
- final long gs = firstBlock.getBlock().getGenerationStamp();
- final String s = "gs=" + gs + ", oldGS=" + oldGS;
- LOG.info(s);
- Assert.assertTrue(s, gs >= oldGS);
-
- LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
- (LocatedStripedBlock) firstBlock,
- CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
- blockGroupList.add(Arrays.asList(blocks));
- }
-
- // test each block group
- for (int group = 0; group < blockGroupList.size(); group++) {
- final boolean isLastGroup = group == blockGroupList.size() - 1;
- final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE
- : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE;
- final int numCellInGroup = (groupSize - 1)/CELL_SIZE + 1;
- final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS;
- final int lastCellSize = groupSize - (numCellInGroup - 1)*CELL_SIZE;
-
- //get the data of this block
- List<LocatedBlock> blockList = blockGroupList.get(group);
- byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][];
- byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][];
-
- // for each block, use BlockReader to read data
- for (int i = 0; i < blockList.size(); i++) {
- final int j = i >= NUM_DATA_BLOCKS? 0: i;
- final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS
- + (j <= lastCellIndex? 1: 0);
- final int blockSize = numCellInBlock*CELL_SIZE
- + (isLastGroup && j == lastCellIndex? lastCellSize - CELL_SIZE: 0);
-
- final byte[] blockBytes = new byte[blockSize];
- if (i < NUM_DATA_BLOCKS) {
- dataBlockBytes[i] = blockBytes;
- } else {
- parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes;
- }
-
- final LocatedBlock lb = blockList.get(i);
- LOG.info("i,j=" + i + ", " + j + ", numCellInBlock=" + numCellInBlock
- + ", blockSize=" + blockSize + ", lb=" + lb);
- if (lb == null) {
- continue;
- }
- final ExtendedBlock block = lb.getBlock();
- Assert.assertEquals(blockSize, block.getNumBytes());
-
-
- if (block.getNumBytes() == 0) {
- continue;
- }
-
- if (i != killedDnIndex) {
- final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
- dfs, lb, 0, block.getNumBytes());
- blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
- blockReader.close();
- }
- }
-
- // check data
- final int groupPosInFile = group*BLOCK_GROUP_SIZE;
- for (int i = 0; i < dataBlockBytes.length; i++) {
- final byte[] actual = dataBlockBytes[i];
- for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) {
- final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(
- CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile;
- Assert.assertTrue(posInFile < length);
- final byte expected = getByte(posInFile);
-
- if (i == killedDnIndex) {
- actual[posInBlk] = expected;
- } else {
- String s = "expected=" + expected + " but actual=" + actual[posInBlk]
- + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk
- + ". group=" + group + ", i=" + i;
- Assert.assertEquals(s, expected, actual[posInBlk]);
- }
- }
- }
-
- // check parity
- TestDFSStripedOutputStream.verifyParity(dfs.getConf(),
- lbs.getLocatedBlocks().get(group).getBlockSize(),
- CELL_SIZE, dataBlockBytes, parityBlockBytes,
- killedDnIndex - dataBlockBytes.length);
- }
- }
private void waitTokenExpires(FSDataOutputStream out) throws IOException {
Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
index c0dca4e..764527d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -39,6 +41,12 @@ public class TestWriteStripedFileWithFailure {
private static MiniDFSCluster cluster;
private static FileSystem fs;
private static Configuration conf = new HdfsConfiguration();
+
+ static {
+ GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
+ }
+
private final short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
private final short parityBlocks = StripedFileTestUtil.NUM_PARITY_BLOCKS;
private final int smallFileLength = blockSize * dataBlocks - 123;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 124bf80..ef31527 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -1745,7 +1745,7 @@ public class TestBalancer {
// verify locations of striped blocks
LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
- DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+ StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
// add one datanode
String newRack = "/rack" + (++numOfRacks);
@@ -1761,7 +1761,7 @@ public class TestBalancer {
// verify locations of striped blocks
locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
- DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+ StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
} finally {
cluster.shutdown();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index 3a9748f..7cf5656 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -488,7 +488,7 @@ public class TestMover {
Assert.assertEquals(StorageType.DISK, type);
}
}
- DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+ StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
dataBlocks + parityBlocks);
// start 5 more datanodes
@@ -523,7 +523,7 @@ public class TestMover {
Assert.assertEquals(StorageType.ARCHIVE, type);
}
}
- DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+ StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
dataBlocks + parityBlocks);
}finally{
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
index 64d33a4..abcdbc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
@@ -42,7 +41,6 @@ import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
public class TestAddOverReplicatedStripedBlocks {
@@ -64,6 +62,7 @@ public class TestAddOverReplicatedStripedBlocks {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
// disable block recovery
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
SimulatedFSDataset.setFactory(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.waitActive();
@@ -118,7 +117,7 @@ public class TestAddOverReplicatedStripedBlocks {
// verify that all internal blocks exists
lbs = cluster.getNameNodeRpc().getBlockLocations(
filePath.toString(), 0, fileLen);
- DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
+ StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
}
@Test
@@ -162,7 +161,7 @@ public class TestAddOverReplicatedStripedBlocks {
// verify that all internal blocks exists
lbs = cluster.getNameNodeRpc().getBlockLocations(
filePath.toString(), 0, fileLen);
- DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
+ StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
}
@Test
@@ -216,7 +215,7 @@ public class TestAddOverReplicatedStripedBlocks {
// verify that all internal blocks exists
lbs = cluster.getNameNodeRpc().getBlockLocations(
filePath.toString(), 0, fileLen);
- DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
+ StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
}
@Test
@@ -248,6 +247,7 @@ public class TestAddOverReplicatedStripedBlocks {
// update blocksMap
cluster.triggerBlockReports();
+ Thread.sleep(2000);
// add to invalidates
cluster.triggerHeartbeats();
// datanode delete block
@@ -259,7 +259,7 @@ public class TestAddOverReplicatedStripedBlocks {
// we are left GROUP_SIZE - 1 blocks.
lbs = cluster.getNameNodeRpc().getBlockLocations(
filePath.toString(), 0, fileLen);
- DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
+ StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
index c27ead5..735f84d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
@@ -736,7 +737,13 @@ public class TestRetryCacheWithHA {
DatanodeInfo[] newNodes = new DatanodeInfo[2];
newNodes[0] = nodes[0];
newNodes[1] = nodes[1];
- String[] storageIDs = {"s0", "s1"};
+ final DatanodeManager dm = cluster.getNamesystem(0).getBlockManager()
+ .getDatanodeManager();
+ final String storageID1 = dm.getDatanode(newNodes[0]).getStorageInfos()[0]
+ .getStorageID();
+ final String storageID2 = dm.getDatanode(newNodes[1]).getStorageInfos()[0]
+ .getStorageID();
+ String[] storageIDs = {storageID1, storageID2};
client.getNamenode().updatePipeline(client.getClientName(), oldBlock,
newBlock, newNodes, storageIDs);
[2/2] hadoop git commit: HDFS-9040. Erasure coding: coordinate data
streamers in DFSStripedOutputStream. Contributed by Jing Zhao and Walter Su.
Posted by ji...@apache.org.
HDFS-9040. Erasure coding: coordinate data streamers in DFSStripedOutputStream. Contributed by Jing Zhao and Walter Su.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6419900a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6419900a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6419900a
Branch: refs/heads/HDFS-7285
Commit: 6419900ac24a5493827abf9b5d90373bc1043e0b
Parents: c09dc25
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Sep 28 14:40:27 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Sep 28 14:40:27 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/hdfs/protocol/DatanodeID.java | 2 +
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 62 +-
.../hadoop/hdfs/DFSStripedOutputStream.java | 603 ++++++++++++++-----
.../org/apache/hadoop/hdfs/DataStreamer.java | 212 +++----
.../apache/hadoop/hdfs/StripedDataStreamer.java | 342 +++--------
.../BlockUnderConstructionFeature.java | 30 +-
.../server/blockmanagement/DatanodeManager.java | 4 +
.../hadoop/hdfs/util/StripedBlockUtil.java | 23 +-
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 31 +-
.../apache/hadoop/hdfs/StripedFileTestUtil.java | 213 ++++++-
.../hadoop/hdfs/TestDFSStripedOutputStream.java | 144 +----
.../TestDFSStripedOutputStreamWithFailure.java | 300 ++++-----
.../hdfs/TestWriteStripedFileWithFailure.java | 8 +
.../hdfs/server/balancer/TestBalancer.java | 4 +-
.../hadoop/hdfs/server/mover/TestMover.java | 4 +-
.../TestAddOverReplicatedStripedBlocks.java | 12 +-
.../namenode/ha/TestRetryCacheWithHA.java | 9 +-
18 files changed, 1068 insertions(+), 938 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
index 6d72285..c709cbd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
@@ -38,6 +38,8 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceStability.Evolving
public class DatanodeID implements Comparable<DatanodeID> {
public static final DatanodeID[] EMPTY_ARRAY = {};
+ public static final DatanodeID EMPTY_DATANODE_ID = new DatanodeID("null",
+ "null", "null", 0, 0, 0, 0);
private String ipAddr; // IP address
private String hostName; // hostname claimed by datanode
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index d62dbac..6a01d61 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -450,3 +450,6 @@
HDFS-8882. Erasure Coding: Use datablocks, parityblocks and cell size from
ErasureCodingPolicy (Vinayakumar B via zhz)
+
+ HDFS-9040. Erasure coding: coordinate data streamers in
+ DFSStripedOutputStream. (jing9 and Walter Su)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 4923c86..e77a00a 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
@@ -212,14 +213,17 @@ public class DFSOutputStream extends FSOutputSummer
/** Construct a new output stream for creating a file. */
protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
- DataChecksum checksum, String[] favoredNodes) throws IOException {
+ DataChecksum checksum, String[] favoredNodes, boolean createStreamer)
+ throws IOException {
this(dfsClient, src, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);
- streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
- cachingStrategy, byteArrayManager, favoredNodes);
+ if (createStreamer) {
+ streamer = new DataStreamer(stat, null, dfsClient, src, progress,
+ checksum, cachingStrategy, byteArrayManager, favoredNodes);
+ }
}
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
@@ -276,7 +280,7 @@ public class DFSOutputStream extends FSOutputSummer
flag, progress, checksum, favoredNodes);
} else {
out = new DFSOutputStream(dfsClient, src, stat,
- flag, progress, checksum, favoredNodes);
+ flag, progress, checksum, favoredNodes, true);
}
out.start();
return out;
@@ -476,7 +480,7 @@ public class DFSOutputStream extends FSOutputSummer
*
* @throws IOException
*/
- protected void endBlock() throws IOException {
+ void endBlock() throws IOException {
if (getStreamer().getBytesCurBlock() == blockSize) {
setCurrentPacketToEmpty();
enqueueCurrentPacket();
@@ -921,4 +925,52 @@ public class DFSOutputStream extends FSOutputSummer
public String toString() {
return getClass().getSimpleName() + ":" + streamer;
}
+
+ static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, DFSClient dfsClient,
+ String src, ExtendedBlock prevBlock, long fileId, String[] favoredNodes)
+ throws IOException {
+ final DfsClientConf conf = dfsClient.getConf();
+ int retries = conf.getNumBlockWriteLocateFollowingRetry();
+ long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
+ long localstart = Time.monotonicNow();
+ while (true) {
+ try {
+ return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock,
+ excludedNodes, fileId, favoredNodes);
+ } catch (RemoteException e) {
+ IOException ue = e.unwrapRemoteException(FileNotFoundException.class,
+ AccessControlException.class,
+ NSQuotaExceededException.class,
+ DSQuotaExceededException.class,
+ QuotaByStorageTypeExceededException.class,
+ UnresolvedPathException.class);
+ if (ue != e) {
+ throw ue; // no need to retry these exceptions
+ }
+ if (NotReplicatedYetException.class.getName().equals(e.getClassName())) {
+ if (retries == 0) {
+ throw e;
+ } else {
+ --retries;
+ LOG.info("Exception while adding a block", e);
+ long elapsed = Time.monotonicNow() - localstart;
+ if (elapsed > 5000) {
+ LOG.info("Waiting for replication for " + (elapsed / 1000)
+ + " seconds");
+ }
+ try {
+ LOG.warn("NotReplicatedYetException sleeping " + src
+ + " retries left " + retries);
+ Thread.sleep(sleeptime);
+ sleeptime *= 2;
+ } catch (InterruptedException ie) {
+ LOG.warn("Caught exception", ie);
+ }
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index d3a054a..c145a2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -25,23 +25,34 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Time;
import org.apache.htrace.Sampler;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
@@ -59,23 +70,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
private final List<BlockingQueue<T>> queues;
MultipleBlockingQueue(int numQueue, int queueSize) {
- queues = new ArrayList<>(numQueue);
+ List<BlockingQueue<T>> list = new ArrayList<>(numQueue);
for (int i = 0; i < numQueue; i++) {
- queues.add(new LinkedBlockingQueue<T>(queueSize));
+ list.add(new LinkedBlockingQueue<T>(queueSize));
}
- }
-
- boolean isEmpty() {
- for(int i = 0; i < queues.size(); i++) {
- if (!queues.get(i).isEmpty()) {
- return false;
- }
- }
- return true;
- }
-
- int numQueues() {
- return queues.size();
+ queues = Collections.synchronizedList(list);
}
void offer(int i, T object) {
@@ -92,6 +91,14 @@ public class DFSStripedOutputStream extends DFSOutputStream {
}
}
+ T takeWithTimeout(int i) throws InterruptedIOException {
+ try {
+ return queues.get(i).poll(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, e);
+ }
+ }
+
T poll(int i) {
return queues.get(i).poll();
}
@@ -99,23 +106,44 @@ public class DFSStripedOutputStream extends DFSOutputStream {
T peek(int i) {
return queues.get(i).peek();
}
+
+ void clear() {
+ for (BlockingQueue<T> q : queues) {
+ q.clear();
+ }
+ }
}
/** Coordinate the communication between the streamers. */
- class Coordinator {
+ static class Coordinator {
+ /**
+ * The next internal block to write to for each streamers. The
+ * DFSStripedOutputStream makes the {@link ClientProtocol#addBlock} RPC to
+ * get a new block group. The block group is split to internal blocks, which
+ * are then distributed into the queue for streamers to retrieve.
+ */
private final MultipleBlockingQueue<LocatedBlock> followingBlocks;
+ /**
+ * Used to sync among all the streamers before allocating a new block. The
+ * DFSStripedOutputStream uses this to make sure every streamer has finished
+ * writing the previous block.
+ */
private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
+ /**
+ * The following data structures are used for syncing while handling errors
+ */
private final MultipleBlockingQueue<LocatedBlock> newBlocks;
- private final MultipleBlockingQueue<ExtendedBlock> updateBlocks;
+ private final Map<StripedDataStreamer, Boolean> updateStreamerMap;
+ private final MultipleBlockingQueue<Boolean> streamerUpdateResult;
- Coordinator(final DfsClientConf conf, final int numDataBlocks,
- final int numAllBlocks) {
+ Coordinator(final int numAllBlocks) {
followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
- endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1);
-
+ endBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
- updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
+ updateStreamerMap = Collections.synchronizedMap(
+ new HashMap<StripedDataStreamer, Boolean>(numAllBlocks));
+ streamerUpdateResult = new MultipleBlockingQueue<>(numAllBlocks, 1);
}
MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() {
@@ -126,68 +154,28 @@ public class DFSStripedOutputStream extends DFSOutputStream {
return newBlocks;
}
- MultipleBlockingQueue<ExtendedBlock> getUpdateBlocks() {
- return updateBlocks;
- }
-
- StripedDataStreamer getStripedDataStreamer(int i) {
- return DFSStripedOutputStream.this.getStripedDataStreamer(i);
- }
-
void offerEndBlock(int i, ExtendedBlock block) {
endBlocks.offer(i, block);
}
- ExtendedBlock takeEndBlock(int i) throws InterruptedIOException {
- return endBlocks.take(i);
+ void offerStreamerUpdateResult(int i, boolean success) {
+ streamerUpdateResult.offer(i, success);
}
- boolean hasAllEndBlocks() {
- for(int i = 0; i < endBlocks.numQueues(); i++) {
- if (endBlocks.peek(i) == null) {
- return false;
- }
- }
- return true;
+ boolean takeStreamerUpdateResult(int i) throws InterruptedIOException {
+ return streamerUpdateResult.take(i);
}
- void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
- ExtendedBlock b = endBlocks.peek(i);
- if (b == null) {
- // streamer just has failed, put end block and continue
- b = block;
- offerEndBlock(i, b);
- }
- b.setNumBytes(newBytes);
+ void updateStreamer(StripedDataStreamer streamer,
+ boolean success) {
+ assert !updateStreamerMap.containsKey(streamer);
+ updateStreamerMap.put(streamer, success);
}
- /** @return a block representing the entire block group. */
- ExtendedBlock getBlockGroup() {
- final StripedDataStreamer s0 = getStripedDataStreamer(0);
- final ExtendedBlock b0 = s0.getBlock();
- if (b0 == null) {
- return null;
- }
-
- final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0;
-
- final ExtendedBlock block = new ExtendedBlock(b0);
- long numBytes = atBlockGroupBoundary? b0.getNumBytes(): s0.getBytesCurBlock();
- for (int i = 1; i < numAllBlocks; i++) {
- final StripedDataStreamer si = getStripedDataStreamer(i);
- final ExtendedBlock bi = si.getBlock();
- if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) {
- block.setGenerationStamp(bi.getGenerationStamp());
- }
- if (i < numDataBlocks) {
- numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock();
- }
- }
- block.setNumBytes(numBytes);
- if (LOG.isDebugEnabled()) {
- LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes());
- }
- return block;
+ void clearFailureStates() {
+ newBlocks.clear();
+ updateStreamerMap.clear();
+ streamerUpdateResult.clear();
}
}
@@ -263,18 +251,16 @@ public class DFSStripedOutputStream extends DFSOutputStream {
private final int cellSize;
private final int numAllBlocks;
private final int numDataBlocks;
-
- @Override
- ExtendedBlock getBlock() {
- return coordinator.getBlockGroup();
- }
+ private ExtendedBlock currentBlockGroup;
+ private final String[] favoredNodes;
+ private final List<StripedDataStreamer> failedStreamers;
/** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes)
throws IOException {
- super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
+ super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating DFSStripedOutputStream for " + src);
}
@@ -284,12 +270,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
cellSize = ecPolicy.getCellSize();
numDataBlocks = ecPolicy.getNumDataUnits();
numAllBlocks = numDataBlocks + numParityBlocks;
+ this.favoredNodes = favoredNodes;
+ failedStreamers = new ArrayList<>();
encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
numDataBlocks, numParityBlocks);
- coordinator = new Coordinator(dfsClient.getConf(),
- numDataBlocks, numAllBlocks);
+ coordinator = new Coordinator(numAllBlocks);
try {
cellBuffers = new CellBuffers(numParityBlocks);
} catch (InterruptedException ie) {
@@ -297,14 +284,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
"Failed to create cell buffers", ie);
}
- List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
+ streamers = new ArrayList<>(numAllBlocks);
for (short i = 0; i < numAllBlocks; i++) {
StripedDataStreamer streamer = new StripedDataStreamer(stat,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
favoredNodes, i, coordinator);
- s.add(streamer);
+ streamers.add(streamer);
}
- streamers = Collections.unmodifiableList(s);
currentPackets = new DFSPacket[streamers.size()];
setCurrentStreamer(0);
}
@@ -318,17 +304,19 @@ public class DFSStripedOutputStream extends DFSOutputStream {
}
private synchronized StripedDataStreamer getCurrentStreamer() {
- return (StripedDataStreamer)streamer;
+ return (StripedDataStreamer) streamer;
}
private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) {
// backup currentPacket for current streamer
- int oldIdx = streamers.indexOf(streamer);
- if (oldIdx >= 0) {
- currentPackets[oldIdx] = currentPacket;
+ if (streamer != null) {
+ int oldIdx = streamers.indexOf(getCurrentStreamer());
+ if (oldIdx >= 0) {
+ currentPackets[oldIdx] = currentPacket;
+ }
}
- streamer = streamers.get(newIdx);
+ streamer = getStripedDataStreamer(newIdx);
currentPacket = currentPackets[newIdx];
adjustChunkBoundary();
@@ -350,40 +338,127 @@ public class DFSStripedOutputStream extends DFSOutputStream {
encoder.encode(dataBuffers, parityBuffers);
}
-
- private void checkStreamers(boolean setExternalError) throws IOException {
- int count = 0;
+ /**
+ * check all the existing StripedDataStreamer and find newly failed streamers.
+ * @return The newly failed streamers.
+ * @throws IOException if less than {@link #numDataBlocks} streamers are still
+ * healthy.
+ */
+ private Set<StripedDataStreamer> checkStreamers() throws IOException {
+ Set<StripedDataStreamer> newFailed = new HashSet<>();
for(StripedDataStreamer s : streamers) {
- if (!s.isFailed()) {
- if (setExternalError && s.getBlock() != null) {
- s.getErrorState().initExternalError();
- }
- count++;
+ if (!s.isHealthy() && !failedStreamers.contains(s)) {
+ newFailed.add(s);
}
}
+
+ final int failCount = failedStreamers.size() + newFailed.size();
if (LOG.isDebugEnabled()) {
LOG.debug("checkStreamers: " + streamers);
- LOG.debug("count=" + count);
+ LOG.debug("healthy streamer count=" + (numAllBlocks - failCount));
+ LOG.debug("original failed streamers: " + failedStreamers);
+ LOG.debug("newly failed streamers: " + newFailed);
}
- if (count < numDataBlocks) {
- throw new IOException("Failed: the number of remaining blocks = "
- + count + " < the number of data blocks = " + numDataBlocks);
+ if (failCount > (numAllBlocks - numDataBlocks)) {
+ throw new IOException("Failed: the number of failed blocks = "
+ + failCount + " > the number of data blocks = "
+ + (numAllBlocks - numDataBlocks));
}
+ return newFailed;
}
private void handleStreamerFailure(String err, Exception e)
throws IOException {
- handleStreamerFailure(err, e, true);
- }
-
- private void handleStreamerFailure(String err, Exception e,
- boolean setExternalError) throws IOException {
LOG.warn("Failed: " + err + ", " + this, e);
- getCurrentStreamer().setFailed(true);
- checkStreamers(setExternalError);
+ getCurrentStreamer().getErrorState().setInternalError();
+ getCurrentStreamer().close(true);
+ checkStreamers();
currentPacket = null;
}
+ private void replaceFailedStreamers() {
+ assert streamers.size() == numAllBlocks;
+ for (short i = 0; i < numAllBlocks; i++) {
+ final StripedDataStreamer oldStreamer = getStripedDataStreamer(i);
+ if (!oldStreamer.isHealthy()) {
+ StripedDataStreamer streamer = new StripedDataStreamer(oldStreamer.stat,
+ dfsClient, src, oldStreamer.progress,
+ oldStreamer.checksum4WriteBlock, cachingStrategy, byteArrayManager,
+ favoredNodes, i, coordinator);
+ streamers.set(i, streamer);
+ currentPackets[i] = null;
+ if (i == 0) {
+ this.streamer = streamer;
+ }
+ streamer.start();
+ }
+ }
+ }
+
+ private void waitEndBlocks(int i) throws IOException {
+ while (getStripedDataStreamer(i).isHealthy()) {
+ final ExtendedBlock b = coordinator.endBlocks.takeWithTimeout(i);
+ if (b != null) {
+ StripedBlockUtil.checkBlocks(currentBlockGroup, i, b);
+ return;
+ }
+ }
+ }
+
+ private void allocateNewBlock() throws IOException {
+ if (currentBlockGroup != null) {
+ for (int i = 0; i < numAllBlocks; i++) {
+ // sync all the healthy streamers before writing to the new block
+ waitEndBlocks(i);
+ }
+ }
+ failedStreamers.clear();
+ // replace failed streamers
+ replaceFailedStreamers();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Allocating new block group. The previous block group: "
+ + currentBlockGroup);
+ }
+
+ // TODO collect excludedNodes from all the data streamers
+ final LocatedBlock lb = addBlock(null, dfsClient, src, currentBlockGroup,
+ fileId, favoredNodes);
+ assert lb.isStriped();
+ if (lb.getLocations().length < numDataBlocks) {
+ throw new IOException("Failed to get " + numDataBlocks
+ + " nodes from namenode: blockGroupSize= " + numAllBlocks
+ + ", blocks.length= " + lb.getLocations().length);
+ }
+ // assign the new block to the current block group
+ currentBlockGroup = lb.getBlock();
+
+ final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
+ (LocatedStripedBlock) lb, cellSize, numDataBlocks,
+ numAllBlocks - numDataBlocks);
+ for (int i = 0; i < blocks.length; i++) {
+ StripedDataStreamer si = getStripedDataStreamer(i);
+ if (si.isHealthy()) { // skipping failed data streamer
+ if (blocks[i] == null) {
+ // Set exception and close streamer as there is no block locations
+ // found for the parity block.
+ LOG.warn("Failed to get block location for parity block, index=" + i);
+ si.getLastException().set(
+ new IOException("Failed to get following block, i=" + i));
+ si.getErrorState().setInternalError();
+ si.close(true);
+ } else {
+ coordinator.getFollowingBlocks().offer(i, blocks[i]);
+ }
+ }
+ }
+ }
+
+ private boolean shouldEndBlockGroup() {
+ return currentBlockGroup != null &&
+ currentBlockGroup.getNumBytes() == blockSize * numDataBlocks;
+ }
+
@Override
protected synchronized void writeChunk(byte[] bytes, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
@@ -392,8 +467,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
final int pos = cellBuffers.addTo(index, bytes, offset, len);
final boolean cellFull = pos == cellSize;
- final long oldBytes = current.getBytesCurBlock();
- if (!current.isFailed()) {
+ if (currentBlockGroup == null || shouldEndBlockGroup()) {
+ // the incoming data should belong to a new block. Allocate a new block.
+ allocateNewBlock();
+ }
+
+ currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len);
+ if (current.isHealthy()) {
try {
super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
} catch(Exception e) {
@@ -401,12 +481,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
}
}
- if (current.isFailed()) {
- final long newBytes = oldBytes + len;
- coordinator.setBytesEndBlock(index, newBytes, current.getBlock());
- current.setBytesCurBlock(newBytes);
- }
-
// Two extra steps are needed when a striping cell is full:
// 1. Forward the current index pointer
// 2. Generate parity packets if a full stripe of data cells are present
@@ -419,11 +493,209 @@ public class DFSStripedOutputStream extends DFSOutputStream {
cellBuffers.flipDataBuffers();
writeParityCells();
next = 0;
+ // check failure state for all the streamers. Bump GS if necessary
+ checkStreamerFailures();
+
+ // if this is the end of the block group, end each internal block
+ if (shouldEndBlockGroup()) {
+ for (int i = 0; i < numAllBlocks; i++) {
+ final StripedDataStreamer s = setCurrentStreamer(i);
+ if (s.isHealthy()) {
+ try {
+ endBlock();
+ } catch (IOException ignored) {}
+ }
+ }
+ }
}
setCurrentStreamer(next);
}
}
+ @Override
+ void enqueueCurrentPacketFull() throws IOException {
+ LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
+ + " appendChunk={}, {}", currentPacket, src, getStreamer()
+ .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
+ getStreamer());
+ enqueueCurrentPacket();
+ adjustChunkBoundary();
+ // no need to end block here
+ }
+
+ private Set<StripedDataStreamer> markExternalErrorOnStreamers() {
+ Set<StripedDataStreamer> healthySet = new HashSet<>();
+ for (StripedDataStreamer streamer : streamers) {
+ if (streamer.isHealthy() &&
+ streamer.getStage() == BlockConstructionStage.DATA_STREAMING) {
+ streamer.setExternalError();
+ healthySet.add(streamer);
+ }
+ }
+ return healthySet;
+ }
+
+ /**
+ * Check and handle data streamer failures. This is called only when we have
+ * written a full stripe (i.e., enqueue all packets for a full stripe), or
+ * when we're closing the outputstream.
+ */
+ private void checkStreamerFailures() throws IOException {
+ Set<StripedDataStreamer> newFailed = checkStreamers();
+ if (newFailed.size() > 0) {
+ // for healthy streamers, wait till all of them have fetched the new block
+ // and flushed out all the enqueued packets.
+ flushAllInternals();
+ }
+ // get all the current failed streamers after the flush
+ newFailed = checkStreamers();
+ while (newFailed.size() > 0) {
+ failedStreamers.addAll(newFailed);
+ coordinator.clearFailureStates();
+
+ // mark all the healthy streamers as external error
+ Set<StripedDataStreamer> healthySet = markExternalErrorOnStreamers();
+
+ // we have newly failed streamers, update block for pipeline
+ final ExtendedBlock newBG = updateBlockForPipeline(healthySet);
+
+ // wait till all the healthy streamers to
+ // 1) get the updated block info
+ // 2) create new block outputstream
+ newFailed = waitCreatingNewStreams(healthySet);
+ if (newFailed.size() + failedStreamers.size() >
+ numAllBlocks - numDataBlocks) {
+ throw new IOException(
+ "Data streamers failed while creating new block streams: "
+ + newFailed + ". There are not enough healthy streamers.");
+ }
+ for (StripedDataStreamer failedStreamer : newFailed) {
+ assert !failedStreamer.isHealthy();
+ }
+
+ // TODO we can also succeed if all the failed streamers have not taken
+ // the updated block
+ if (newFailed.size() == 0) {
+ // reset external error state of all the streamers
+ for (StripedDataStreamer streamer : healthySet) {
+ assert streamer.isHealthy();
+ streamer.getErrorState().reset();
+ }
+ updatePipeline(newBG);
+ }
+ for (int i = 0; i < numAllBlocks; i++) {
+ coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0);
+ }
+ }
+ }
+
+ private int checkStreamerUpdates(Set<StripedDataStreamer> failed,
+ Set<StripedDataStreamer> streamers) {
+ for (StripedDataStreamer streamer : streamers) {
+ if (!coordinator.updateStreamerMap.containsKey(streamer)) {
+ if (!streamer.isHealthy() &&
+ coordinator.getNewBlocks().peek(streamer.getIndex()) != null) {
+ // this streamer had internal error before getting updated block
+ failed.add(streamer);
+ }
+ }
+ }
+ return coordinator.updateStreamerMap.size() + failed.size();
+ }
+
+ private Set<StripedDataStreamer> waitCreatingNewStreams(
+ Set<StripedDataStreamer> healthyStreamers) throws IOException {
+ Set<StripedDataStreamer> failed = new HashSet<>();
+ final int expectedNum = healthyStreamers.size();
+ final long socketTimeout = dfsClient.getConf().getSocketTimeout();
+ // the total wait time should be less than the socket timeout, otherwise
+ // a slow streamer may cause other streamers to timeout. here we wait for
+ // half of the socket timeout
+ long remaingTime = socketTimeout > 0 ? socketTimeout/2 : Long.MAX_VALUE;
+ final long waitInterval = 1000;
+ synchronized (coordinator) {
+ while (checkStreamerUpdates(failed, healthyStreamers) < expectedNum
+ && remaingTime > 0) {
+ try {
+ long start = Time.monotonicNow();
+ coordinator.wait(waitInterval);
+ remaingTime -= Time.monotonicNow() - start;
+ } catch (InterruptedException e) {
+ throw DFSUtil.toInterruptedIOException("Interrupted when waiting" +
+ " for results of updating striped streamers", e);
+ }
+ }
+ }
+ synchronized (coordinator) {
+ for (StripedDataStreamer streamer : healthyStreamers) {
+ if (!coordinator.updateStreamerMap.containsKey(streamer)) {
+ // close the streamer if it is too slow to create new connection
+ streamer.setStreamerAsClosed();
+ failed.add(streamer);
+ }
+ }
+ }
+ for (Map.Entry<StripedDataStreamer, Boolean> entry :
+ coordinator.updateStreamerMap.entrySet()) {
+ if (!entry.getValue()) {
+ failed.add(entry.getKey());
+ }
+ }
+ for (StripedDataStreamer failedStreamer : failed) {
+ healthyStreamers.remove(failedStreamer);
+ }
+ return failed;
+ }
+
+ /**
+ * Call {@link ClientProtocol#updateBlockForPipeline} and assign updated block
+ * to healthy streamers.
+ * @param healthyStreamers The healthy data streamers. These streamers join
+ * the failure handling.
+ */
+ private ExtendedBlock updateBlockForPipeline(
+ Set<StripedDataStreamer> healthyStreamers) throws IOException {
+ final LocatedBlock updated = dfsClient.namenode.updateBlockForPipeline(
+ currentBlockGroup, dfsClient.clientName);
+ final long newGS = updated.getBlock().getGenerationStamp();
+ ExtendedBlock newBlock = new ExtendedBlock(currentBlockGroup);
+ newBlock.setGenerationStamp(newGS);
+ final LocatedBlock[] updatedBlks = StripedBlockUtil.parseStripedBlockGroup(
+ (LocatedStripedBlock) updated, cellSize, numDataBlocks,
+ numAllBlocks - numDataBlocks);
+
+ for (int i = 0; i < numAllBlocks; i++) {
+ StripedDataStreamer si = getStripedDataStreamer(i);
+ if (healthyStreamers.contains(si)) {
+ final LocatedBlock lb = new LocatedBlock(new ExtendedBlock(newBlock),
+ null, null, null, -1, updated.isCorrupt(), null);
+ lb.setBlockToken(updatedBlks[i].getBlockToken());
+ coordinator.getNewBlocks().offer(i, lb);
+ }
+ }
+ return newBlock;
+ }
+
+ private void updatePipeline(ExtendedBlock newBG) throws IOException {
+ final DatanodeInfo[] newNodes = new DatanodeInfo[numAllBlocks];
+ final String[] newStorageIDs = new String[numAllBlocks];
+ for (int i = 0; i < numAllBlocks; i++) {
+ final StripedDataStreamer streamer = getStripedDataStreamer(i);
+ final DatanodeInfo[] nodes = streamer.getNodes();
+ final String[] storageIDs = streamer.getStorageIDs();
+ if (streamer.isHealthy() && nodes != null && storageIDs != null) {
+ newNodes[i] = nodes[0];
+ newStorageIDs[i] = storageIDs[0];
+ } else {
+ newNodes[i] = new DatanodeInfo(DatanodeID.EMPTY_DATANODE_ID);
+ newStorageIDs[i] = "";
+ }
+ }
+ dfsClient.namenode.updatePipeline(dfsClient.clientName, currentBlockGroup,
+ newBG, newNodes, newStorageIDs);
+ currentBlockGroup = newBG;
+ }
+
private int stripeDataSize() {
return numDataBlocks * cellSize;
}
@@ -500,28 +772,16 @@ public class DFSStripedOutputStream extends DFSOutputStream {
}
}
- /**
- * Simply add bytesCurBlock together. Note that this result is not accurately
- * the size of the block group.
- */
- private long getCurrentSumBytes() {
- long sum = 0;
- for (int i = 0; i < numDataBlocks; i++) {
- sum += streamers.get(i).getBytesCurBlock();
- }
- return sum;
- }
-
private boolean generateParityCellsForLastStripe() {
- final long currentBlockGroupBytes = getCurrentSumBytes();
- if (currentBlockGroupBytes % stripeDataSize() == 0) {
+ final long currentBlockGroupBytes = currentBlockGroup == null ?
+ 0 : currentBlockGroup.getNumBytes();
+ final long lastStripeSize = currentBlockGroupBytes % stripeDataSize();
+ if (lastStripeSize == 0) {
return false;
}
- final int firstCellSize =
- (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize);
- final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize?
- firstCellSize : cellSize;
+ final long parityCellSize = lastStripeSize < cellSize?
+ lastStripeSize : cellSize;
final ByteBuffer[] buffers = cellBuffers.getBuffers();
for (int i = 0; i < numAllBlocks; i++) {
@@ -550,13 +810,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
cellBuffers.clear();
}
- void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf
- ) throws IOException {
+ void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf)
+ throws IOException {
final StripedDataStreamer current = setCurrentStreamer(index);
final int len = buffer.limit();
final long oldBytes = current.getBytesCurBlock();
- if (!current.isFailed()) {
+ if (current.isHealthy()) {
try {
DataChecksum sum = getDataChecksum();
sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
@@ -570,18 +830,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
}
}
-
- if (current.isFailed()) {
- final long newBytes = oldBytes + len;
- current.setBytesCurBlock(newBytes);
- }
}
@Override
void setClosed() {
super.setClosed();
for (int i = 0; i < numAllBlocks; i++) {
- streamers.get(i).release();
+ getStripedDataStreamer(i).release();
}
cellBuffers.release();
}
@@ -607,37 +862,40 @@ public class DFSStripedOutputStream extends DFSOutputStream {
try {
// flush from all upper layers
- try {
- flushBuffer();
- } catch(Exception e) {
- handleStreamerFailure("flushBuffer " + getCurrentStreamer(), e);
- }
+ flushBuffer();
// if the last stripe is incomplete, generate and write parity cells
if (generateParityCellsForLastStripe()) {
writeParityCells();
}
enqueueAllCurrentPackets();
+ // flush all the data packets
+ flushAllInternals();
+ // check failures
+ checkStreamerFailures();
+
for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i);
- if (!s.isFailed()) {
+ if (s.isHealthy()) {
try {
if (s.getBytesCurBlock() > 0) {
setCurrentPacketToEmpty();
}
- // flush all data to Datanode
+ // flush the last "close" packet to Datanode
flushInternal();
} catch(Exception e) {
- handleStreamerFailure("flushInternal " + s, e, false);
+ // TODO for both close and endBlock, we currently do not handle
+ // failures when sending the last packet. We actually do not need to
+ // bump GS for this kind of failure. Thus counting the total number
+ // of failures may be good enough.
}
}
}
closeThreads(false);
- final ExtendedBlock lastBlock = coordinator.getBlockGroup();
TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
try {
- completeFile(lastBlock);
+ completeFile(currentBlockGroup);
} finally {
scope.close();
}
@@ -652,14 +910,45 @@ public class DFSStripedOutputStream extends DFSOutputStream {
int idx = streamers.indexOf(getCurrentStreamer());
for(int i = 0; i < streamers.size(); i++) {
final StripedDataStreamer si = setCurrentStreamer(i);
- if (!si.isFailed() && currentPacket != null) {
+ if (si.isHealthy() && currentPacket != null) {
try {
enqueueCurrentPacket();
} catch (IOException e) {
- handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e, false);
+ handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e);
}
}
}
setCurrentStreamer(idx);
}
+
+ void flushAllInternals() throws IOException {
+ int current = getCurrentIndex();
+
+ for (int i = 0; i < numAllBlocks; i++) {
+ final StripedDataStreamer s = setCurrentStreamer(i);
+ if (s.isHealthy()) {
+ try {
+ // flush all data to Datanode
+ flushInternal();
+ } catch(Exception e) {
+ handleStreamerFailure("flushInternal " + s, e);
+ }
+ }
+ }
+ setCurrentStreamer(current);
+ }
+
+ static void sleep(long ms, String op) throws InterruptedIOException {
+ try {
+ Thread.sleep(ms);
+ } catch(InterruptedException ie) {
+ throw DFSUtil.toInterruptedIOException(
+ "Sleep interrupted during " + op, ie);
+ }
+ }
+
+ @Override
+ ExtendedBlock getBlock() {
+ return currentBlockGroup;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index c478f1c..a6eb01f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -22,7 +22,6 @@ import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SU
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
@@ -46,16 +45,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
@@ -69,13 +64,10 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
@@ -204,9 +196,12 @@ class DataStreamer extends Daemon {
}
}
+ enum ErrorType {
+ NONE, INTERNAL, EXTERNAL
+ }
+
static class ErrorState {
- private boolean error = false;
- private boolean externalError = false;
+ ErrorType error = ErrorType.NONE;
private int badNodeIndex = -1;
private int restartingNodeIndex = -1;
private long restartingNodeDeadline = 0;
@@ -216,35 +211,47 @@ class DataStreamer extends Daemon {
this.datanodeRestartTimeout = datanodeRestartTimeout;
}
+ synchronized void resetInternalError() {
+ if (hasInternalError()) {
+ error = ErrorType.NONE;
+ }
+ badNodeIndex = -1;
+ restartingNodeIndex = -1;
+ restartingNodeDeadline = 0;
+ }
+
synchronized void reset() {
- error = false;
- externalError = false;
+ error = ErrorType.NONE;
badNodeIndex = -1;
restartingNodeIndex = -1;
restartingNodeDeadline = 0;
}
- synchronized boolean hasError() {
- return error;
+ synchronized boolean hasInternalError() {
+ return error == ErrorType.INTERNAL;
}
- synchronized boolean hasExternalErrorOnly() {
- return error && externalError && !isNodeMarked();
+ synchronized boolean hasExternalError() {
+ return error == ErrorType.EXTERNAL;
}
- synchronized boolean hasDatanodeError() {
- return error && (isNodeMarked() || externalError);
+ synchronized boolean hasError() {
+ return error != ErrorType.NONE;
}
- synchronized void setError(boolean err) {
- this.error = err;
+ synchronized boolean hasDatanodeError() {
+ return error == ErrorType.INTERNAL && isNodeMarked();
}
- synchronized void initExternalError() {
- setError(true);
- this.externalError = true;
+ synchronized void setInternalError() {
+ this.error = ErrorType.INTERNAL;
}
+ synchronized void setExternalError() {
+ if (!hasInternalError()) {
+ this.error = ErrorType.EXTERNAL;
+ }
+ }
synchronized void setBadNodeIndex(int index) {
this.badNodeIndex = index;
@@ -306,14 +313,14 @@ class DataStreamer extends Daemon {
}
if (!isRestartingNode()) {
- error = false;
+ error = ErrorType.NONE;
}
badNodeIndex = -1;
}
synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) {
if (restartingNodeIndex >= 0) {
- if (!error) {
+ if (error == ErrorType.NONE) {
throw new IllegalStateException("error=false while checking" +
" restarting node deadline");
}
@@ -345,7 +352,7 @@ class DataStreamer extends Daemon {
private volatile boolean streamerClosed = false;
protected ExtendedBlock block; // its length is number of bytes acked
- private Token<BlockTokenIdentifier> accessToken;
+ protected Token<BlockTokenIdentifier> accessToken;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
private ResponseProcessor response = null;
@@ -355,7 +362,7 @@ class DataStreamer extends Daemon {
private final ErrorState errorState;
private BlockConstructionStage stage; // block construction stage
- private long bytesSent = 0; // number of bytes that've been sent
+ protected long bytesSent = 0; // number of bytes that've been sent
private final boolean isLazyPersistFile;
/** Nodes have been used in the pipeline before and have failed. */
@@ -378,13 +385,13 @@ class DataStreamer extends Daemon {
protected final DFSClient dfsClient;
protected final String src;
/** Only for DataTransferProtocol.writeBlock(..) */
- private final DataChecksum checksum4WriteBlock;
- private final Progressable progress;
+ final DataChecksum checksum4WriteBlock;
+ final Progressable progress;
protected final HdfsFileStatus stat;
// appending to existing partial block
private volatile boolean appendChunk = false;
// both dataQueue and ackQueue are protected by dataQueue lock
- private final LinkedList<DFSPacket> dataQueue = new LinkedList<>();
+ protected final LinkedList<DFSPacket> dataQueue = new LinkedList<>();
private final LinkedList<DFSPacket> ackQueue = new LinkedList<>();
private final AtomicReference<CachingStrategy> cachingStrategy;
private final ByteArrayManager byteArrayManager;
@@ -401,7 +408,7 @@ class DataStreamer extends Daemon {
CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
private int lastCongestionBackoffTime;
- private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
+ protected final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
private final String[] favoredNodes;
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
@@ -473,6 +480,10 @@ class DataStreamer extends Daemon {
}
}
+ void setAccessToken(Token<BlockTokenIdentifier> t) {
+ this.accessToken = t;
+ }
+
private void setPipeline(LocatedBlock lb) {
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
}
@@ -533,7 +544,7 @@ class DataStreamer extends Daemon {
DFSPacket one;
try {
// process datanode IO errors if any
- boolean doSleep = processDatanodeError();
+ boolean doSleep = processDatanodeOrExternalError();
final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
synchronized (dataQueue) {
@@ -696,7 +707,7 @@ class DataStreamer extends Daemon {
}
lastException.set(e);
assert !(e instanceof NullPointerException);
- errorState.setError(true);
+ errorState.setInternalError();
if (!errorState.isNodeMarked()) {
// Not a datanode issue
streamerClosed = true;
@@ -837,6 +848,9 @@ class DataStreamer extends Daemon {
}
}
+ void setStreamerAsClosed() {
+ streamerClosed = true;
+ }
private void checkClosed() throws IOException {
if (streamerClosed) {
@@ -857,7 +871,7 @@ class DataStreamer extends Daemon {
}
}
- private void closeStream() {
+ void closeStream() {
final MultipleIOException.Builder b = new MultipleIOException.Builder();
if (blockStream != null) {
@@ -1037,7 +1051,7 @@ class DataStreamer extends Daemon {
} catch (Exception e) {
if (!responderClosed) {
lastException.set(e);
- errorState.setError(true);
+ errorState.setInternalError();
errorState.markFirstNodeIfNotMarked();
synchronized (dataQueue) {
dataQueue.notifyAll();
@@ -1059,18 +1073,18 @@ class DataStreamer extends Daemon {
}
}
+ private boolean shouldHandleExternalError(){
+ return errorState.hasExternalError() && blockStream != null;
+ }
+
/**
* If this stream has encountered any errors, shutdown threads
* and mark the stream as closed.
*
* @return true if it should sleep for a while after returning.
*/
- private boolean processDatanodeError() throws IOException {
- if (!errorState.hasDatanodeError()) {
- return false;
- }
- if (errorState.hasExternalErrorOnly() && block == null) {
- // block is not yet initialized, handle external error later.
+ private boolean processDatanodeOrExternalError() throws IOException {
+ if (!errorState.hasDatanodeError() && !shouldHandleExternalError()) {
return false;
}
if (response != null) {
@@ -1103,7 +1117,8 @@ class DataStreamer extends Daemon {
return false;
}
}
- boolean doSleep = setupPipelineForAppendOrRecovery();
+
+ setupPipelineForAppendOrRecovery();
if (!streamerClosed && dfsClient.clientRunning) {
if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
@@ -1135,7 +1150,7 @@ class DataStreamer extends Daemon {
}
}
- return doSleep;
+ return false;
}
void setHflush() {
@@ -1266,7 +1281,7 @@ class DataStreamer extends Daemon {
* This happens when a file is appended or data streaming fails
* It keeps on trying until a pipeline is setup
*/
- private boolean setupPipelineForAppendOrRecovery() throws IOException {
+ private void setupPipelineForAppendOrRecovery() throws IOException {
// check number of datanodes
if (nodes == null || nodes.length == 0) {
String msg = "Could not get block locations. " + "Source file \""
@@ -1274,19 +1289,23 @@ class DataStreamer extends Daemon {
LOG.warn(msg);
lastException.set(new IOException(msg));
streamerClosed = true;
- return false;
+ return;
}
+ setupPipelineInternal(nodes, storageTypes);
+ }
+ protected void setupPipelineInternal(DatanodeInfo[] datanodes,
+ StorageType[] nodeStorageTypes) throws IOException {
boolean success = false;
long newGS = 0L;
while (!success && !streamerClosed && dfsClient.clientRunning) {
if (!handleRestartingDatanode()) {
- return false;
+ return;
}
- final boolean isRecovery = errorState.hasError();
+ final boolean isRecovery = errorState.hasInternalError();
if (!handleBadDatanode()) {
- return false;
+ return;
}
handleDatanodeReplacement();
@@ -1307,7 +1326,6 @@ class DataStreamer extends Daemon {
if (success) {
block = updatePipeline(newGS);
}
- return false; // do not sleep, continue processing
}
/**
@@ -1315,7 +1333,7 @@ class DataStreamer extends Daemon {
* This process is repeated until the deadline or the node starts back up.
* @return true if it should continue.
*/
- private boolean handleRestartingDatanode() {
+ boolean handleRestartingDatanode() {
if (errorState.isRestartingNode()) {
// 4 seconds or the configured deadline period, whichever is shorter.
// This is the retry interval and recovery will be retried in this
@@ -1338,7 +1356,7 @@ class DataStreamer extends Daemon {
* Remove bad node from list of nodes if badNodeIndex was set.
* @return true if it should continue.
*/
- private boolean handleBadDatanode() {
+ boolean handleBadDatanode() {
final int badNodeIndex = errorState.getBadNodeIndex();
if (badNodeIndex >= 0) {
if (nodes.length <= 1) {
@@ -1388,7 +1406,7 @@ class DataStreamer extends Daemon {
}
}
- private void failPacket4Testing() {
+ void failPacket4Testing() {
if (failPacket) { // for testing
failPacket = false;
try {
@@ -1400,13 +1418,8 @@ class DataStreamer extends Daemon {
}
}
- LocatedBlock updateBlockForPipeline() throws IOException {
- return callUpdateBlockForPipeline(block);
- }
-
- LocatedBlock callUpdateBlockForPipeline(ExtendedBlock newBlock) throws IOException {
- return dfsClient.namenode.updateBlockForPipeline(
- newBlock, dfsClient.clientName);
+ private LocatedBlock updateBlockForPipeline() throws IOException {
+ return dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
}
static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) {
@@ -1417,18 +1430,12 @@ class DataStreamer extends Daemon {
/** update pipeline at the namenode */
ExtendedBlock updatePipeline(long newGS) throws IOException {
final ExtendedBlock newBlock = newBlock(block, newGS);
- return callUpdatePipeline(block, newBlock, nodes, storageIDs);
- }
-
- ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock,
- DatanodeInfo[] newNodes, String[] newStorageIDs)
- throws IOException {
- dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, newBlock,
- newNodes, newStorageIDs);
+ dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
+ nodes, storageIDs);
return newBlock;
}
- int getNumBlockWriteRetry() {
+ private int getNumBlockWriteRetry() {
return dfsClient.getConf().getNumBlockWriteRetry();
}
@@ -1438,7 +1445,7 @@ class DataStreamer extends Daemon {
* Must get block ID and the IDs of the destinations from the namenode.
* Returns the list of target datanodes.
*/
- private LocatedBlock nextBlockOutputStream() throws IOException {
+ protected LocatedBlock nextBlockOutputStream() throws IOException {
LocatedBlock lb = null;
DatanodeInfo[] nodes = null;
StorageType[] storageTypes = null;
@@ -1446,9 +1453,8 @@ class DataStreamer extends Daemon {
boolean success = false;
ExtendedBlock oldBlock = block;
do {
- errorState.reset();
+ errorState.resetInternalError();
lastException.clear();
- success = false;
DatanodeInfo[] excluded =
excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
@@ -1488,7 +1494,7 @@ class DataStreamer extends Daemon {
// connects to the first datanode in the pipeline
// Returns true if success, otherwise return failure.
//
- private boolean createBlockOutputStream(DatanodeInfo[] nodes,
+ boolean createBlockOutputStream(DatanodeInfo[] nodes,
StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
if (nodes.length == 0) {
LOG.info("nodes are empty for write pipeline of " + block);
@@ -1567,7 +1573,7 @@ class DataStreamer extends Daemon {
assert null == blockStream : "Previous blockStream unclosed";
blockStream = out;
result = true; // success
- errorState.reset();
+ errorState.resetInternalError();
} catch (IOException ie) {
if (!errorState.isRestartingNode()) {
LOG.info("Exception in createBlockOutputStream " + this, ie);
@@ -1603,7 +1609,7 @@ class DataStreamer extends Daemon {
if (checkRestart && shouldWaitForRestart(i)) {
errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " + nodes[i]);
}
- errorState.setError(true);
+ errorState.setInternalError();
lastException.set(ie);
result = false; // error
} finally {
@@ -1645,58 +1651,10 @@ class DataStreamer extends Daemon {
}
}
- LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
+ private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException {
- final DfsClientConf conf = dfsClient.getConf();
- int retries = conf.getNumBlockWriteLocateFollowingRetry();
- long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
- while (true) {
- long localstart = Time.monotonicNow();
- while (true) {
- try {
- return dfsClient.namenode.addBlock(src, dfsClient.clientName,
- block, excludedNodes, stat.getFileId(), favoredNodes);
- } catch (RemoteException e) {
- IOException ue =
- e.unwrapRemoteException(FileNotFoundException.class,
- AccessControlException.class,
- NSQuotaExceededException.class,
- DSQuotaExceededException.class,
- QuotaByStorageTypeExceededException.class,
- UnresolvedPathException.class);
- if (ue != e) {
- throw ue; // no need to retry these exceptions
- }
-
-
- if (NotReplicatedYetException.class.getName().
- equals(e.getClassName())) {
- if (retries == 0) {
- throw e;
- } else {
- --retries;
- LOG.info("Exception while adding a block", e);
- long elapsed = Time.monotonicNow() - localstart;
- if (elapsed > 5000) {
- LOG.info("Waiting for replication for "
- + (elapsed / 1000) + " seconds");
- }
- try {
- LOG.warn("NotReplicatedYetException sleeping " + src
- + " retries left " + retries);
- Thread.sleep(sleeptime);
- sleeptime *= 2;
- } catch (InterruptedException ie) {
- LOG.warn("Caught exception", ie);
- }
- }
- } else {
- throw e;
- }
-
- }
- }
- }
+ return DFSOutputStream.addBlock(excludedNodes, dfsClient, src, block,
+ stat.getFileId(), favoredNodes);
}
/**
@@ -1755,6 +1713,10 @@ class DataStreamer extends Daemon {
return storageIDs;
}
+ BlockConstructionStage getStage() {
+ return stage;
+ }
+
/**
* return the token of the block
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index 2f83f7c..a313ecb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -19,18 +19,15 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator;
-import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
@@ -46,66 +43,8 @@ import com.google.common.annotations.VisibleForTesting;
* other streamers.
*/
public class StripedDataStreamer extends DataStreamer {
- /**
- * This class is designed for multiple threads to share a
- * {@link MultipleBlockingQueue}. Initially, the queue is empty. The earliest
- * thread calling poll populates entries to the queue and the other threads
- * will wait for it. Once the entries are populated, all the threads can poll
- * their entries.
- *
- * @param <T> the queue entry type.
- */
- static abstract class ConcurrentPoll<T> {
- final MultipleBlockingQueue<T> queue;
-
- ConcurrentPoll(MultipleBlockingQueue<T> queue) {
- this.queue = queue;
- }
-
- T poll(final int i) throws IOException {
- for(;;) {
- synchronized(queue) {
- final T polled = queue.poll(i);
- if (polled != null) { // already populated; return polled item.
- return polled;
- }
- if (isReady2Populate()) {
- try {
- populate();
- return queue.poll(i);
- } catch(IOException ioe) {
- LOG.warn("Failed to populate, " + this, ioe);
- throw ioe;
- }
- }
- }
-
- // sleep and then retry.
- sleep(100, "poll");
- }
- }
-
- boolean isReady2Populate() {
- return queue.isEmpty();
- }
-
- abstract void populate() throws IOException;
- }
-
- private static void sleep(long ms, String op) throws InterruptedIOException {
- try {
- Thread.sleep(ms);
- } catch(InterruptedException ie) {
- throw DFSUtil.toInterruptedIOException(
- "Sleep interrupted during " + op, ie);
- }
- }
-
private final Coordinator coordinator;
private final int index;
- private volatile boolean failed;
- private final ECSchema schema;
- private final int cellSize;
StripedDataStreamer(HdfsFileStatus stat,
DFSClient dfsClient, String src,
@@ -117,102 +56,59 @@ public class StripedDataStreamer extends DataStreamer {
byteArrayManage, favoredNodes);
this.index = index;
this.coordinator = coordinator;
- this.schema = stat.getErasureCodingPolicy().getSchema();
- this.cellSize = stat.getErasureCodingPolicy().getCellSize();
}
int getIndex() {
return index;
}
- void setFailed(boolean failed) {
- this.failed = failed;
- }
-
- boolean isFailed() {
- return failed;
- }
-
- private boolean isParityStreamer() {
- return index >= schema.getNumDataUnits();
+ boolean isHealthy() {
+ return !streamerClosed() && !getErrorState().hasInternalError();
}
@Override
protected void endBlock() {
- if (!isParityStreamer()) {
- coordinator.offerEndBlock(index, block);
- }
+ coordinator.offerEndBlock(index, block);
super.endBlock();
}
- @Override
- int getNumBlockWriteRetry() {
- return 0;
+ /**
+ * The upper level DFSStripedOutputStream will allocate the new block group.
+ * All the striped data streamer only needs to fetch from the queue, which
+ * should be already be ready.
+ */
+ private LocatedBlock getFollowingBlock() throws IOException {
+ if (!this.isHealthy()) {
+ // No internal block for this streamer, maybe no enough healthy DN.
+ // Throw the exception which has been set by the StripedOutputStream.
+ this.getLastException().check(false);
+ }
+ return coordinator.getFollowingBlocks().poll(index);
}
@Override
- LocatedBlock locateFollowingBlock(final DatanodeInfo[] excludedNodes)
- throws IOException {
- return new ConcurrentPoll<LocatedBlock>(coordinator.getFollowingBlocks()) {
- @Override
- boolean isReady2Populate() {
- return super.isReady2Populate()
- && (block == null || coordinator.hasAllEndBlocks());
- }
-
- @Override
- void populate() throws IOException {
- getLastException().check(false);
-
- if (block != null) {
- // set numByte for the previous block group
- long bytes = 0;
- for (int i = 0; i < schema.getNumDataUnits(); i++) {
- final ExtendedBlock b = coordinator.takeEndBlock(i);
- StripedBlockUtil.checkBlocks(index, block, i, b);
- bytes += b.getNumBytes();
- }
- block.setNumBytes(bytes);
- block.setBlockId(block.getBlockId() - index);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("locateFollowingBlock: index=" + index + ", block=" + block);
- }
-
- final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock(
- excludedNodes);
- if (lb.getLocations().length < schema.getNumDataUnits()) {
- throw new IOException(
- "Failed to get datablocks number of nodes from namenode: blockGroupSize= "
- + (schema.getNumDataUnits() + schema.getNumParityUnits())
- + ", blocks.length= " + lb.getLocations().length);
- }
- final LocatedBlock[] blocks =
- StripedBlockUtil.parseStripedBlockGroup((LocatedStripedBlock) lb,
- cellSize, schema.getNumDataUnits(), schema.getNumParityUnits());
-
- for (int i = 0; i < blocks.length; i++) {
- StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
- if (si.isFailed()) {
- continue; // skipping failed data streamer
- }
- if (blocks[i] == null) {
- // Set exception and close streamer as there is no block locations
- // found for the parity block.
- LOG.warn("Failed to get block location for parity block, index="
- + i);
- si.getLastException().set(
- new IOException("Failed to get following block, i=" + i));
- si.setFailed(true);
- si.endBlock();
- si.close(true);
- } else {
- queue.offer(i, blocks[i]);
- }
- }
- }
- }.poll(index);
+ protected LocatedBlock nextBlockOutputStream() throws IOException {
+ boolean success;
+ LocatedBlock lb = getFollowingBlock();
+ block = lb.getBlock();
+ block.setNumBytes(0);
+ bytesSent = 0;
+ accessToken = lb.getBlockToken();
+
+ DatanodeInfo[] nodes = lb.getLocations();
+ StorageType[] storageTypes = lb.getStorageTypes();
+
+ // Connect to the DataNode. If fail the internal error state will be set.
+ success = createBlockOutputStream(nodes, storageTypes, 0L, false);
+
+ if (!success) {
+ block = null;
+ final DatanodeInfo badNode = nodes[getErrorState().getBadNodeIndex()];
+ LOG.info("Excluding datanode " + badNode);
+ excludedNodes.put(badNode, badNode);
+ throw new IOException("Unable to create new block.");
+ }
+ return lb;
}
@VisibleForTesting
@@ -221,119 +117,71 @@ public class StripedDataStreamer extends DataStreamer {
}
@Override
- LocatedBlock updateBlockForPipeline() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("updateBlockForPipeline(), " + this);
- }
- return new ConcurrentPoll<LocatedBlock>(coordinator.getNewBlocks()) {
- @Override
- void populate() throws IOException {
- final ExtendedBlock bg = coordinator.getBlockGroup();
- final LocatedBlock updated = callUpdateBlockForPipeline(bg);
- final long newGS = updated.getBlock().getGenerationStamp();
- final LocatedBlock[] updatedBlks = StripedBlockUtil
- .parseStripedBlockGroup((LocatedStripedBlock) updated, cellSize,
- schema.getNumDataUnits(), schema.getNumParityUnits());
- for (int i = 0; i < schema.getNumDataUnits()
- + schema.getNumParityUnits(); i++) {
- StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
- if (si.isFailed()) {
- continue; // skipping failed data streamer
- }
- final ExtendedBlock bi = si.getBlock();
- if (bi != null) {
- final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS),
- null, null, null, -1, updated.isCorrupt(), null);
- lb.setBlockToken(updatedBlks[i].getBlockToken());
- queue.offer(i, lb);
- } else {
- final MultipleBlockingQueue<LocatedBlock> followingBlocks
- = coordinator.getFollowingBlocks();
- synchronized(followingBlocks) {
- final LocatedBlock lb = followingBlocks.peek(i);
- if (lb != null) {
- lb.getBlock().setGenerationStamp(newGS);
- si.getErrorState().reset();
- continue;
- }
- }
-
- //streamer i just have polled the block, sleep and retry.
- sleep(100, "updateBlockForPipeline, " + this);
- i--;
- }
- }
+ protected void setupPipelineInternal(DatanodeInfo[] nodes,
+ StorageType[] nodeStorageTypes) throws IOException {
+ boolean success = false;
+ while (!success && !streamerClosed() && dfsClient.clientRunning) {
+ if (!handleRestartingDatanode()) {
+ return;
+ }
+ if (!handleBadDatanode()) {
+ // for striped streamer if it is datanode error then close the stream
+ // and return. no need to replace datanode
+ return;
}
- }.poll(index);
- }
-
- @Override
- ExtendedBlock updatePipeline(final long newGS) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("updatePipeline(newGS=" + newGS + "), " + this);
- }
- return new ConcurrentPoll<ExtendedBlock>(coordinator.getUpdateBlocks()) {
- @Override
- void populate() throws IOException {
- final MultipleBlockingQueue<LocatedBlock> followingBlocks
- = coordinator.getFollowingBlocks();
- final ExtendedBlock bg = coordinator.getBlockGroup();
- final ExtendedBlock newBG = newBlock(bg, newGS);
- final int n = schema.getNumDataUnits() + schema.getNumParityUnits();
- final DatanodeInfo[] newNodes = new DatanodeInfo[n];
- final String[] newStorageIDs = new String[n];
- for (int i = 0; i < n; i++) {
- final StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
- DatanodeInfo[] nodes = si.getNodes();
- String[] storageIDs = si.getStorageIDs();
- if (nodes == null || storageIDs == null) {
- synchronized(followingBlocks) {
- final LocatedBlock lb = followingBlocks.peek(i);
- if (lb != null) {
- nodes = lb.getLocations();
- storageIDs = lb.getStorageIDs();
- }
- }
- }
- if (nodes != null && storageIDs != null) {
- newNodes[i] = nodes[0];
- newStorageIDs[i] = storageIDs[0];
- } else {
- //streamer i just have polled the block, sleep and retry.
- sleep(100, "updatePipeline, " + this);
- i--;
- }
+ // get a new generation stamp and an access token
+ final LocatedBlock lb = coordinator.getNewBlocks().take(index);
+ long newGS = lb.getBlock().getGenerationStamp();
+ setAccessToken(lb.getBlockToken());
+
+ // set up the pipeline again with the remaining nodes. when a striped
+ // data streamer comes here, it must be in external error state.
+ assert getErrorState().hasExternalError();
+ success = createBlockOutputStream(nodes, nodeStorageTypes, newGS, true);
+
+ failPacket4Testing();
+ getErrorState().checkRestartingNodeDeadline(nodes);
+
+ // notify coordinator the result of createBlockOutputStream
+ synchronized (coordinator) {
+ if (!streamerClosed()) {
+ coordinator.updateStreamer(this, success);
+ coordinator.notify();
+ } else {
+ success = false;
}
- final ExtendedBlock updated = callUpdatePipeline(bg, newBG, newNodes,
- newStorageIDs);
-
- for (int i = 0; i < n; i++) {
- final StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
- final ExtendedBlock bi = si.getBlock();
- if (bi != null) {
- queue.offer(i, newBlock(bi, updated.getGenerationStamp()));
- } else if (!si.isFailed()) {
- synchronized(followingBlocks) {
- final LocatedBlock lb = followingBlocks.peek(i);
- if (lb != null) {
- lb.getBlock().setGenerationStamp(newGS);
- si.getErrorState().reset();
- continue;
- }
- }
+ }
- //streamer i just have polled the block, sleep and retry.
- sleep(100, "updatePipeline, " + this);
- i--;
- }
+ if (success) {
+ // wait for results of other streamers
+ success = coordinator.takeStreamerUpdateResult(index);
+ if (success) {
+ // if all succeeded, update its block using the new GS
+ block = newBlock(block, newGS);
+ } else {
+ // otherwise close the block stream and restart the recovery process
+ closeStream();
}
+ } else {
+ // if fail, close the stream. The internal error state and last
+ // exception have already been set in createBlockOutputStream
+ // TODO: wait for restarting DataNodes during RollingUpgrade
+ closeStream();
+ setStreamerAsClosed();
}
- }.poll(index);
+ } // while
+ }
+
+ void setExternalError() {
+ getErrorState().setExternalError();
+ synchronized (dataQueue) {
+ dataQueue.notifyAll();
+ }
}
@Override
public String toString() {
- return "#" + index + ": " + (failed? "failed, ": "") + super.toString();
+ return "#" + index + ": " + (!isHealthy() ? "failed, ": "") + super.toString();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
index 0e92779..1d4cff3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
@@ -68,16 +68,28 @@ public class BlockUnderConstructionFeature {
/** Set expected locations */
public void setExpectedLocations(Block block, DatanodeStorageInfo[] targets,
boolean isStriped) {
- int numLocations = targets == null ? 0 : targets.length;
+ if (targets == null) {
+ return;
+ }
+ int numLocations = 0;
+ for (DatanodeStorageInfo target : targets) {
+ if (target != null) {
+ numLocations++;
+ }
+ }
+
this.replicas = new ReplicaUnderConstruction[numLocations];
- for(int i = 0; i < numLocations; i++) {
- // when creating a new striped block we simply sequentially assign block
- // index to each storage
- Block replicaBlock = isStriped ?
- new Block(block.getBlockId() + i, 0, block.getGenerationStamp()) :
- block;
- replicas[i] = new ReplicaUnderConstruction(replicaBlock, targets[i],
- ReplicaState.RBW);
+ int offset = 0;
+ for(int i = 0; i < targets.length; i++) {
+ if (targets[i] != null) {
+ // when creating a new striped block we simply sequentially assign block
+ // index to each storage
+ Block replicaBlock = isStriped ?
+ new Block(block.getBlockId() + i, 0, block.getGenerationStamp()) :
+ block;
+ replicas[offset++] = new ReplicaUnderConstruction(replicaBlock,
+ targets[i], ReplicaState.RBW);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index b5b3b97..61c6386 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -513,6 +513,10 @@ public class DatanodeManager {
}
final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[datanodeID.length];
for(int i = 0; i < datanodeID.length; i++) {
+ if (datanodeID[i].equals(DatanodeID.EMPTY_DATANODE_ID)) {
+ storages[i] = null;
+ continue;
+ }
final DatanodeDescriptor dd = getDatanode(datanodeID[i]);
storages[i] = dd.getStorageInfo(storageIDs[i]);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 5af3585..d49d39b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -925,22 +925,21 @@ public class StripedBlockUtil {
/**
* Check if the information such as IDs and generation stamps in block-i
- * match block-j, where block-i and block-j are in the same group.
+ * match the block group.
*/
- public static void checkBlocks(int j, ExtendedBlock blockj,
+ public static void checkBlocks(ExtendedBlock blockGroup,
int i, ExtendedBlock blocki) throws IOException {
-
- if (!blocki.getBlockPoolId().equals(blockj.getBlockPoolId())) {
- throw new IOException("Block pool IDs mismatched: block" + j + "="
- + blockj + ", block" + i + "=" + blocki);
+ if (!blocki.getBlockPoolId().equals(blockGroup.getBlockPoolId())) {
+ throw new IOException("Block pool IDs mismatched: block" + i + "="
+ + blocki + ", expected block group=" + blockGroup);
}
- if (blocki.getBlockId() - i != blockj.getBlockId() - j) {
- throw new IOException("Block IDs mismatched: block" + j + "="
- + blockj + ", block" + i + "=" + blocki);
+ if (blocki.getBlockId() - i != blockGroup.getBlockId()) {
+ throw new IOException("Block IDs mismatched: block" + i + "="
+ + blocki + ", expected block group=" + blockGroup);
}
- if (blocki.getGenerationStamp() != blockj.getGenerationStamp()) {
- throw new IOException("Generation stamps mismatched: block" + j + "="
- + blockj + ", block" + i + "=" + blocki);
+ if (blocki.getGenerationStamp() != blockGroup.getGenerationStamp()) {
+ throw new IOException("Generation stamps mismatched: block" + i + "="
+ + blocki + ", expected block group=" + blockGroup);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 274d319..e621f26 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1988,35 +1988,14 @@ public class DFSTestUtil {
*/
public static ExtendedBlock flushInternal(DFSStripedOutputStream out)
throws IOException {
- out.flushInternal();
+ out.flushAllInternals();
return out.getBlock();
}
- /**
- * Verify that blocks in striped block group are on different nodes, and every
- * internal blocks exists.
- */
- public static void verifyLocatedStripedBlocks(LocatedBlocks lbs,
- int groupSize) {
- for (LocatedBlock lb : lbs.getLocatedBlocks()) {
- assert lb instanceof LocatedStripedBlock;
- HashSet<DatanodeInfo> locs = new HashSet<>();
- for (DatanodeInfo datanodeInfo : lb.getLocations()) {
- locs.add(datanodeInfo);
- }
- assertEquals(groupSize, lb.getLocations().length);
- assertEquals(groupSize, locs.size());
-
- // verify that every internal blocks exists
- int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices();
- assertEquals(groupSize, blockIndices.length);
- HashSet<Integer> found = new HashSet<>();
- for (int index : blockIndices) {
- assert index >=0;
- found.add(index);
- }
- assertEquals(groupSize, found.size());
- }
+ public static ExtendedBlock flushBuffer(DFSStripedOutputStream out)
+ throws IOException {
+ out.flush();
+ return out.getBlock();
}
public static void waitForMetric(final JMXGet jmx, final String metricName, final int expectedValue)