You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/04 19:58:01 UTC
[27/50] hadoop git commit: HDFS-8166. DFSStripedOutputStream should
not create empty blocks. Contributed by Jing Zhao.
HDFS-8166. DFSStripedOutputStream should not create empty blocks. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/567105ef
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/567105ef
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/567105ef
Branch: refs/heads/HDFS-7285
Commit: 567105ef3f58c94aa55d607e4d1d150c772bb167
Parents: 647173e
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Apr 17 17:55:19 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon May 4 10:13:26 2015 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/DFSStripedOutputStream.java | 163 +++++++++++--------
.../apache/hadoop/hdfs/StripedDataStreamer.java | 72 +++-----
.../server/blockmanagement/BlockManager.java | 17 +-
.../hadoop/hdfs/TestDFSStripedOutputStream.java | 162 +++++++++++-------
4 files changed, 236 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/567105ef/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index f11a657..7dc0091 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -22,10 +22,14 @@ import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -59,12 +63,12 @@ public class DFSStripedOutputStream extends DFSOutputStream {
*/
private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private ByteBuffer[] cellBuffers;
- private final short blockGroupBlocks = HdfsConstants.NUM_DATA_BLOCKS
+ private final short numAllBlocks = HdfsConstants.NUM_DATA_BLOCKS
+ HdfsConstants.NUM_PARITY_BLOCKS;
- private final short blockGroupDataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+ private final short numDataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private int curIdx = 0;
/* bytes written in current block group */
- private long currentBlockGroupBytes = 0;
+ //private long currentBlockGroupBytes = 0;
//TODO: Use ErasureCoder interface (HDFS-7781)
private RawErasureEncoder encoder;
@@ -73,10 +77,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
return streamers.get(0);
}
- private long getBlockGroupSize() {
- return blockSize * HdfsConstants.NUM_DATA_BLOCKS;
- }
-
/** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
@@ -84,15 +84,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
throws IOException {
super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
DFSClient.LOG.info("Creating striped output stream");
- if (blockGroupBlocks <= 1) {
- throw new IOException("The block group must contain more than one block.");
- }
+ checkConfiguration();
- cellBuffers = new ByteBuffer[blockGroupBlocks];
+ cellBuffers = new ByteBuffer[numAllBlocks];
List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>();
- for (int i = 0; i < blockGroupBlocks; i++) {
- stripeBlocks.add(new LinkedBlockingQueue<LocatedBlock>(blockGroupBlocks));
+ for (int i = 0; i < numAllBlocks; i++) {
+ stripeBlocks.add(new LinkedBlockingQueue<LocatedBlock>(numAllBlocks));
try {
cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
} catch (InterruptedException ie) {
@@ -103,29 +101,38 @@ public class DFSStripedOutputStream extends DFSOutputStream {
}
}
encoder = new RSRawEncoder();
- encoder.initialize(blockGroupDataBlocks,
- blockGroupBlocks - blockGroupDataBlocks, cellSize);
+ encoder.initialize(numDataBlocks,
+ numAllBlocks - numDataBlocks, cellSize);
- streamers = new ArrayList<>(blockGroupBlocks);
- for (short i = 0; i < blockGroupBlocks; i++) {
+ List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
+ for (short i = 0; i < numAllBlocks; i++) {
StripedDataStreamer streamer = new StripedDataStreamer(stat, null,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
i, stripeBlocks);
if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes);
}
- streamers.add(streamer);
+ s.add(streamer);
}
+ streamers = Collections.unmodifiableList(s);
refreshStreamer();
}
+ private void checkConfiguration() {
+ if (cellSize % bytesPerChecksum != 0) {
+ throw new HadoopIllegalArgumentException("Invalid values: "
+ + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+ + ") must divide cell size (=" + cellSize + ").");
+ }
+ }
+
private void refreshStreamer() {
streamer = streamers.get(curIdx);
}
private void moveToNextStreamer() {
- curIdx = (curIdx + 1) % blockGroupBlocks;
+ curIdx = (curIdx + 1) % numAllBlocks;
refreshStreamer();
}
@@ -136,20 +143,21 @@ public class DFSStripedOutputStream extends DFSOutputStream {
* @param buffers data buffers + parity buffers
*/
private void encode(ByteBuffer[] buffers) {
- ByteBuffer[] dataBuffers = new ByteBuffer[blockGroupDataBlocks];
- ByteBuffer[] parityBuffers = new ByteBuffer[blockGroupBlocks - blockGroupDataBlocks];
- for (int i = 0; i < blockGroupBlocks; i++) {
- if (i < blockGroupDataBlocks) {
+ ByteBuffer[] dataBuffers = new ByteBuffer[numDataBlocks];
+ ByteBuffer[] parityBuffers = new ByteBuffer[numAllBlocks - numDataBlocks];
+ for (int i = 0; i < numAllBlocks; i++) {
+ if (i < numDataBlocks) {
dataBuffers[i] = buffers[i];
} else {
- parityBuffers[i - blockGroupDataBlocks] = buffers[i];
+ parityBuffers[i - numDataBlocks] = buffers[i];
}
}
encoder.encode(dataBuffers, parityBuffers);
}
/**
- * Generate packets from a given buffer
+ * Generate packets from a given buffer. This is only used for streamers
+ * writing parity blocks.
*
* @param byteBuffer the given buffer to generate packets
* @return packets generated
@@ -185,7 +193,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
throw new IOException(msg);
}
-
// If current packet has not been enqueued for transmission,
// but the cell buffer is full, we need to enqueue the packet
if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) {
@@ -213,13 +220,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
//When all data cells in a stripe are ready, we need to encode
//them and generate some parity cells. These cells will be
//converted to packets and put to their DataStreamer's queue.
- if (curIdx == blockGroupDataBlocks) {
+ if (curIdx == numDataBlocks) {
//encode the data cells
- for (int k = 0; k < blockGroupDataBlocks; k++) {
+ for (int k = 0; k < numDataBlocks; k++) {
cellBuffers[k].flip();
}
encode(cellBuffers);
- for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) {
+ for (int i = numDataBlocks; i < numAllBlocks; i++) {
ByteBuffer parityBuffer = cellBuffers[i];
List<DFSPacket> packets = generatePackets(parityBuffer);
for (DFSPacket p : packets) {
@@ -245,13 +252,24 @@ public class DFSStripedOutputStream extends DFSOutputStream {
}
private void clearCellBuffers() {
- for (int i = 0; i< blockGroupBlocks; i++) {
+ for (int i = 0; i< numAllBlocks; i++) {
cellBuffers[i].clear();
+ if (i >= numDataBlocks) {
+ Arrays.fill(cellBuffers[i].array(), (byte) 0);
+ }
}
}
private int stripeDataSize() {
- return blockGroupDataBlocks * cellSize;
+ return numDataBlocks * cellSize;
+ }
+
+ private long getCurrentBlockGroupBytes() {
+ long sum = 0;
+ for (int i = 0; i < numDataBlocks; i++) {
+ sum += streamers.get(i).getBytesCurBlock();
+ }
+ return sum;
}
private void notSupported(String headMsg)
@@ -270,7 +288,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
notSupported("hsync");
}
-
@Override
protected synchronized void start() {
for (StripedDataStreamer streamer : streamers) {
@@ -302,15 +319,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
// interrupt datastreamer if force is true
@Override
protected void closeThreads(boolean force) throws IOException {
- StripedDataStreamer leadingStreamer = null;
for (StripedDataStreamer streamer : streamers) {
try {
streamer.close(force);
streamer.join();
streamer.closeSocket();
- if (streamer.isLeadingStreamer()) {
- leadingStreamer = streamer;
- }
} catch (InterruptedException e) {
throw new IOException("Failed to shutdown streamer");
} finally {
@@ -318,40 +331,26 @@ public class DFSStripedOutputStream extends DFSOutputStream {
setClosed();
}
}
- assert leadingStreamer != null : "One streamer should be leader";
- leadingStreamer.countTailingBlockGroupBytes();
- }
-
- @Override
- public synchronized void write(int b) throws IOException {
- super.write(b);
- currentBlockGroupBytes = (currentBlockGroupBytes + 1) % getBlockGroupSize();
- }
-
- @Override
- public synchronized void write(byte b[], int off, int len)
- throws IOException {
- super.write(b, off, len);
- currentBlockGroupBytes = (currentBlockGroupBytes + len) % getBlockGroupSize();
}
- private void writeParityCellsForLastStripe() throws IOException{
+ private void writeParityCellsForLastStripe() throws IOException {
+ final long currentBlockGroupBytes = getCurrentBlockGroupBytes();
long parityBlkSize = StripedBlockUtil.getInternalBlockLength(
- currentBlockGroupBytes, cellSize, blockGroupDataBlocks,
- blockGroupDataBlocks + 1);
+ currentBlockGroupBytes, cellSize, numDataBlocks,
+ numDataBlocks + 1);
if (parityBlkSize == 0 || currentBlockGroupBytes % stripeDataSize() == 0) {
return;
}
int parityCellSize = parityBlkSize % cellSize == 0 ? cellSize :
(int) (parityBlkSize % cellSize);
- for (int i = 0; i < blockGroupBlocks; i++) {
+ for (int i = 0; i < numAllBlocks; i++) {
long internalBlkLen = StripedBlockUtil.getInternalBlockLength(
- currentBlockGroupBytes, cellSize, blockGroupDataBlocks, i);
+ currentBlockGroupBytes, cellSize, numDataBlocks, i);
// Pad zero bytes to make all cells exactly the size of parityCellSize
// If internal block is smaller than parity block, pad zero bytes.
// Also pad zero bytes to all parity cells
- if (internalBlkLen < parityBlkSize || i >= blockGroupDataBlocks) {
+ if (internalBlkLen < parityBlkSize || i >= numDataBlocks) {
int position = cellBuffers[i].position();
assert position <= parityCellSize : "If an internal block is smaller" +
" than parity block, then its last cell should be small than last" +
@@ -365,9 +364,9 @@ public class DFSStripedOutputStream extends DFSOutputStream {
encode(cellBuffers);
//write parity cells
- curIdx = blockGroupDataBlocks;
+ curIdx = numDataBlocks;
refreshStreamer();
- for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) {
+ for (int i = numDataBlocks; i < numAllBlocks; i++) {
ByteBuffer parityBuffer = cellBuffers[i];
List<DFSPacket> packets = generatePackets(parityBuffer);
for (DFSPacket p : packets) {
@@ -385,7 +384,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
@Override
void setClosed() {
super.setClosed();
- for (int i = 0; i < blockGroupBlocks; i++) {
+ for (int i = 0; i < numAllBlocks; i++) {
byteArrayManager.release(cellBuffers[i].array());
streamers.get(i).release();
}
@@ -395,10 +394,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
protected synchronized void closeImpl() throws IOException {
if (isClosed()) {
IOException e = getLeadingStreamer().getLastException().getAndSet(null);
- if (e == null)
- return;
- else
+ if (e != null) {
throw e;
+ } else {
+ return;
+ }
}
try {
@@ -408,14 +408,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
}
- //if the last stripe is incomplete, generate and write parity cells
+ // if the last stripe is incomplete, generate and write parity cells
writeParityCellsForLastStripe();
- for (int i = 0; i < blockGroupBlocks; i++) {
+ for (int i = 0; i < numAllBlocks; i++) {
curIdx = i;
refreshStreamer();
- if (streamer.getBytesCurBlock()!= 0 ||
- currentBlockGroupBytes < getBlockGroupSize()) {
+ if (streamer.getBytesCurBlock() > 0) {
// send an empty packet to mark the end of the block
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true);
@@ -425,9 +424,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
flushInternal();
}
- // get last block before destroying the streamer
- ExtendedBlock lastBlock = streamers.get(0).getBlock();
closeThreads(false);
+ final ExtendedBlock lastBlock = getCommittedBlock();
TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
try {
completeFile(lastBlock);
@@ -435,10 +433,35 @@ public class DFSStripedOutputStream extends DFSOutputStream {
scope.close();
}
dfsClient.endFileLease(fileId);
- } catch (ClosedChannelException e) {
+ } catch (ClosedChannelException ignored) {
} finally {
setClosed();
}
}
+ /**
+ * Generate the block which is reported and will be committed in NameNode.
+ * Need to go through all the streamers writing data blocks and add their
+ * bytesCurBlock together. Note that at this time all streamers have been
+ * closed. Also this calculation can cover streamers with writing failures.
+ *
+ * @return An ExtendedBlock with size of the whole block group.
+ */
+ ExtendedBlock getCommittedBlock() throws IOException {
+ ExtendedBlock b = getLeadingStreamer().getBlock();
+ if (b == null) {
+ return null;
+ }
+ final ExtendedBlock block = new ExtendedBlock(b);
+ final boolean atBlockGroupBoundary =
+ getLeadingStreamer().getBytesCurBlock() == 0 &&
+ getLeadingStreamer().getBlock() != null &&
+ getLeadingStreamer().getBlock().getNumBytes() > 0;
+ for (int i = 1; i < numDataBlocks; i++) {
+ block.setNumBytes(block.getNumBytes() +
+ (atBlockGroupBoundary ? streamers.get(i).getBlock().getNumBytes() :
+ streamers.get(i).getBytesCurBlock()));
+ }
+ return block;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/567105ef/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index 5614852..19c205e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -22,7 +22,6 @@ import java.util.List;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@@ -37,6 +36,10 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
+
/****************************************************************************
* The StripedDataStreamer class is used by {@link DFSStripedOutputStream}.
* There are two kinds of StripedDataStreamer, leading streamer and ordinary
@@ -47,9 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
****************************************************************************/
public class StripedDataStreamer extends DataStreamer {
private final short index;
- private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
- private static short blockGroupSize = HdfsConstants.NUM_DATA_BLOCKS
- + HdfsConstants.NUM_PARITY_BLOCKS;
+ private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
private boolean hasCommittedBlock = false;
StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block,
@@ -88,66 +89,38 @@ public class StripedDataStreamer extends DataStreamer {
}
private boolean isParityStreamer() {
- return index >= HdfsConstants.NUM_DATA_BLOCKS;
+ return index >= NUM_DATA_BLOCKS;
}
@Override
protected void endBlock() {
if (!isLeadingStreamer() && !isParityStreamer()) {
- //before retrieving a new block, transfer the finished block to
- //leading streamer
+ // before retrieving a new block, transfer the finished block to
+ // leading streamer
LocatedBlock finishedBlock = new LocatedBlock(
new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(),
- block.getNumBytes(),block.getGenerationStamp()), null);
- try{
+ block.getNumBytes(), block.getGenerationStamp()), null);
+ try {
boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
TimeUnit.SECONDS);
- }catch (InterruptedException ie) {
- //TODO: Handle InterruptedException (HDFS-7786)
+ } catch (InterruptedException ie) {
+ // TODO: Handle InterruptedException (HDFS-7786)
}
}
super.endBlock();
}
- /**
- * This function is called after the streamer is closed.
- */
- void countTailingBlockGroupBytes () throws IOException {
- if (isLeadingStreamer()) {
- //when committing a block group, leading streamer has to adjust
- // {@link block} including the size of block group
- for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
- try {
- LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
- TimeUnit.SECONDS);
- if (finishedLocatedBlock == null) {
- throw new IOException("Fail to get finished LocatedBlock " +
- "from streamer, i=" + i);
- }
- ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
- long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
- if (block != null) {
- block.setNumBytes(block.getNumBytes() + bytes);
- }
- } catch (InterruptedException ie) {
- DFSClient.LOG.info("InterruptedException received when " +
- "putting a block to stripeBlocks, ie = " + ie);
- }
- }
- }
- }
-
@Override
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException {
LocatedBlock lb = null;
if (isLeadingStreamer()) {
- if(hasCommittedBlock) {
+ if (hasCommittedBlock) {
/**
* when committing a block group, leading streamer has to adjust
* {@link block} to include the size of block group
*/
- for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
+ for (int i = 1; i < NUM_DATA_BLOCKS; i++) {
try {
LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
TimeUnit.SECONDS);
@@ -157,7 +130,7 @@ public class StripedDataStreamer extends DataStreamer {
}
ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
- if(block != null) {
+ if (block != null) {
block.setNumBytes(block.getNumBytes() + bytes);
}
} catch (InterruptedException ie) {
@@ -171,14 +144,13 @@ public class StripedDataStreamer extends DataStreamer {
hasCommittedBlock = true;
assert lb instanceof LocatedStripedBlock;
DFSClient.LOG.debug("Leading streamer obtained bg " + lb);
- LocatedBlock[] blocks = StripedBlockUtil.
- parseStripedBlockGroup((LocatedStripedBlock) lb,
- HdfsConstants.BLOCK_STRIPED_CELL_SIZE, HdfsConstants.NUM_DATA_BLOCKS,
- HdfsConstants.NUM_PARITY_BLOCKS
- );
- assert blocks.length == blockGroupSize :
+ LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
+ (LocatedStripedBlock) lb, BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS,
+ NUM_PARITY_BLOCKS);
+ assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) :
"Fail to get block group from namenode: blockGroupSize: " +
- blockGroupSize + ", blocks.length: " + blocks.length;
+ (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " +
+ blocks.length;
lb = blocks[0];
for (int i = 1; i < blocks.length; i++) {
try {
@@ -199,7 +171,7 @@ public class StripedDataStreamer extends DataStreamer {
}
} else {
try {
- //wait 90 seconds to get a block from the queue
+ // wait 90 seconds to get a block from the queue
lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
DFSClient.LOG.info("InterruptedException received when retrieving " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/567105ef/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 01422db..a0f945b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -597,8 +598,20 @@ public class BlockManager {
}
public short getMinStorageNum(BlockInfo block) {
- return block.isStriped() ?
- ((BlockInfoStriped) block).getDataBlockNum() : minReplication;
+ if (block.isStriped()) {
+ final BlockInfoStriped sblock = (BlockInfoStriped) block;
+ short dataBlockNum = sblock.getDataBlockNum();
+ if (sblock.isComplete() ||
+ sblock.getBlockUCState() == BlockUCState.COMMITTED) {
+ // if the sblock is committed/completed and its length is less than a
+ // full stripe, the minimum storage number needs to be adjusted
+ dataBlockNum = (short) Math.min(dataBlockNum,
+ (sblock.getNumBytes() - 1) / HdfsConstants.BLOCK_STRIPED_CELL_SIZE + 1);
+ }
+ return dataBlockNum;
+ } else {
+ return minReplication;
+ }
}
public boolean hasMinStorage(BlockInfo block) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/567105ef/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index 4a09bda..cc20f40 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -1,5 +1,6 @@
package org.apache.hadoop.hdfs;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -20,6 +21,8 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.junit.After;
@@ -42,8 +45,8 @@ public class TestDFSStripedOutputStream {
private DistributedFileSystem fs;
private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final int stripesPerBlock = 4;
- int blockSize = cellSize * stripesPerBlock;
- private int mod = 29;
+ private final int blockSize = cellSize * stripesPerBlock;
+ private final RawErasureEncoder encoder = new RSRawEncoder();
@Before
public void setup() throws IOException {
@@ -53,6 +56,7 @@ public class TestDFSStripedOutputStream {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
fs = cluster.getFileSystem();
+ encoder.initialize(dataBlocks, parityBlocks, cellSize);
}
@After
@@ -144,60 +148,27 @@ public class TestDFSStripedOutputStream {
}
private byte getByte(long pos) {
+ int mod = 29;
return (byte) (pos % mod + 1);
}
- private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
- throws IOException {
- Path TestPath = new Path(src);
- byte[] bytes = generateBytes(writeBytes);
- DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
-
- //check file length
- FileStatus status = fs.getFileStatus(TestPath);
- long fileLength = status.getLen();
- if (fileLength != writeBytes) {
- Assert.fail("File Length error: expect=" + writeBytes
- + ", actual=" + fileLength);
- }
-
- DFSStripedInputStream dis = new DFSStripedInputStream(
- fs.getClient(), src, true);
- byte[] buf = new byte[writeBytes + 100];
- int readLen = dis.read(0, buf, 0, buf.length);
- readLen = readLen >= 0 ? readLen : 0;
- if (readLen != writeBytes) {
- Assert.fail("The length of file is not correct.");
- }
-
- for (int i = 0; i < writeBytes; i++) {
- if (getByte(i) != buf[i]) {
- Assert.fail("Byte at i = " + i + " is wrongly written.");
- }
- }
- }
-
private void testOneFile(String src, int writeBytes)
throws IOException {
- Path TestPath = new Path(src);
+ Path testPath = new Path(src);
- int allBlocks = dataBlocks + parityBlocks;
byte[] bytes = generateBytes(writeBytes);
- DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
+ DFSTestUtil.writeFile(fs, testPath, new String(bytes));
- //check file length
- FileStatus status = fs.getFileStatus(TestPath);
+ // check file length
+ FileStatus status = fs.getFileStatus(testPath);
long fileLength = status.getLen();
- if (fileLength != writeBytes) {
- Assert.fail("File Length error: expect=" + writeBytes
- + ", actual=" + fileLength);
- }
+ Assert.assertEquals(writeBytes, fileLength);
List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
- assert firstBlock instanceof LocatedStripedBlock;
+ Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
LocatedBlock[] blocks = StripedBlockUtil.
parseStripedBlockGroup((LocatedStripedBlock) firstBlock,
cellSize, dataBlocks, parityBlocks);
@@ -205,15 +176,14 @@ public class TestDFSStripedOutputStream {
blockGroupList.add(oneGroup);
}
- //test each block group
+ // test each block group
for (int group = 0; group < blockGroupList.size(); group++) {
//get the data of this block
List<LocatedBlock> blockList = blockGroupList.get(group);
byte[][] dataBlockBytes = new byte[dataBlocks][];
- byte[][] parityBlockBytes = new byte[allBlocks - dataBlocks][];
+ byte[][] parityBlockBytes = new byte[parityBlocks][];
-
- //for each block, use BlockReader to read data
+ // for each block, use BlockReader to read data
for (int i = 0; i < blockList.size(); i++) {
LocatedBlock lblock = blockList.get(i);
if (lblock == null) {
@@ -269,19 +239,20 @@ public class TestDFSStripedOutputStream {
}
}).build();
- blockReader.readAll(blockBytes, 0, (int)block.getNumBytes());
+ blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
blockReader.close();
}
- //check if we write the data correctly
- for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length; blkIdxInGroup++) {
- byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup];
+ // check if we write the data correctly
+ for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length;
+ blkIdxInGroup++) {
+ final byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup];
if (actualBlkBytes == null) {
continue;
}
for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) {
byte expected;
- //calculate the postion of this byte in the file
+ // calculate the position of this byte in the file
long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize,
dataBlocks, posInBlk, blkIdxInGroup) +
group * blockSize * dataBlocks;
@@ -291,15 +262,94 @@ public class TestDFSStripedOutputStream {
expected = getByte(posInFile);
}
- if (expected != actualBlkBytes[posInBlk]) {
- Assert.fail("Unexpected byte " + actualBlkBytes[posInBlk] + ", expect " + expected
- + ". Block group index is " + group +
- ", stripe index is " + posInBlk / cellSize +
- ", cell index is " + blkIdxInGroup + ", byte index is " + posInBlk % cellSize);
+ String s = "Unexpected byte " + actualBlkBytes[posInBlk]
+ + ", expect " + expected
+ + ". Block group index is " + group
+ + ", stripe index is " + posInBlk / cellSize
+ + ", cell index is " + blkIdxInGroup
+ + ", byte index is " + posInBlk % cellSize;
+ Assert.assertEquals(s, expected, actualBlkBytes[posInBlk]);
+ }
+ }
+
+ // verify the parity blocks
+ final ByteBuffer[] parityBuffers = new ByteBuffer[parityBlocks];
+ final long groupSize = lbs.getLocatedBlocks().get(group).getBlockSize();
+ int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(groupSize,
+ cellSize, dataBlocks, dataBlocks);
+ for (int i = 0; i < parityBlocks; i++) {
+ parityBuffers[i] = ByteBuffer.allocate(parityBlkSize);
+ }
+ final int numStripes = (int) (groupSize - 1) / stripeDataSize() + 1;
+ for (int i = 0; i < numStripes; i++) {
+ final int parityCellSize = i < numStripes - 1 || parityBlkSize % cellSize == 0
+ ? cellSize : parityBlkSize % cellSize;
+ ByteBuffer[] stripeBuf = new ByteBuffer[dataBlocks];
+ for (int k = 0; k < stripeBuf.length; k++) {
+ stripeBuf[k] = ByteBuffer.allocate(cellSize);
+ }
+ for (int j = 0; j < dataBlocks; j++) {
+ if (dataBlockBytes[j] != null) {
+ int length = Math.min(cellSize,
+ dataBlockBytes[j].length - cellSize * i);
+ if (length > 0) {
+ stripeBuf[j].put(dataBlockBytes[j], cellSize * i, length);
+ }
+ }
+ final long pos = stripeBuf[j].position();
+ for (int k = 0; k < parityCellSize - pos; k++) {
+ stripeBuf[j].put((byte) 0);
}
+ stripeBuf[j].flip();
}
+ ByteBuffer[] parityBuf = new ByteBuffer[parityBlocks];
+ for (int j = 0; j < parityBlocks; j++) {
+ parityBuf[j] = ByteBuffer.allocate(cellSize);
+ for (int k = 0; k < parityCellSize; k++) {
+ parityBuf[j].put((byte) 0);
+ }
+ parityBuf[j].flip();
+ }
+
+ encoder.encode(stripeBuf, parityBuf);
+ for (int j = 0; j < parityBlocks; j++) {
+ parityBuffers[j].put(parityBuf[j]);
+ }
+ }
+
+ for (int i = 0; i < parityBlocks; i++) {
+ Assert.assertArrayEquals(parityBuffers[i].array(), parityBlockBytes[i]);
}
}
}
+ private void testReadWriteOneFile(String src, int writeBytes)
+ throws IOException {
+ Path TestPath = new Path(src);
+ byte[] bytes = generateBytes(writeBytes);
+ DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
+
+ //check file length
+ FileStatus status = fs.getFileStatus(TestPath);
+ long fileLength = status.getLen();
+ if (fileLength != writeBytes) {
+ Assert.fail("File Length error: expect=" + writeBytes
+ + ", actual=" + fileLength);
+ }
+
+ DFSStripedInputStream dis = new DFSStripedInputStream(
+ fs.getClient(), src, true);
+ byte[] buf = new byte[writeBytes + 100];
+ int readLen = dis.read(0, buf, 0, buf.length);
+ readLen = readLen >= 0 ? readLen : 0;
+ if (readLen != writeBytes) {
+ Assert.fail("The length of file is not correct.");
+ }
+
+ for (int i = 0; i < writeBytes; i++) {
+ if (getByte(i) != buf[i]) {
+ Assert.fail("Byte at i = " + i + " is wrongly written.");
+ }
+ }
+ }
}