You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/26 21:34:38 UTC
[08/50] [abbrv] hadoop git commit: HDFS-7672. Handle write failure
for stripping blocks and refactor the existing code in DFSStripedOutputStream
and StripedDataStreamer.
HDFS-7672. Handle write failure for stripping blocks and refactor the existing code in DFSStripedOutputStream and StripedDataStreamer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/220ca960
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/220ca960
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/220ca960
Branch: refs/heads/HDFS-7285
Commit: 220ca960bce970d5969b9af570a3ce43360b7e2b
Parents: e849be2
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Tue May 5 16:26:49 2015 -0700
Committer: Zhe Zhang <zh...@cloudera.com>
Committed: Tue May 26 12:00:45 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 69 +--
.../hadoop/hdfs/DFSStripedOutputStream.java | 501 ++++++++++++-------
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 11 +-
.../org/apache/hadoop/hdfs/DataStreamer.java | 15 +-
.../apache/hadoop/hdfs/StripedDataStreamer.java | 156 ++----
.../hdfs/server/namenode/FSDirectory.java | 2 +-
.../org/apache/hadoop/hdfs/MiniDFSCluster.java | 2 -
.../hadoop/hdfs/TestDFSStripedOutputStream.java | 18 +-
.../TestDFSStripedOutputStreamWithFailure.java | 323 ++++++++++++
10 files changed, 765 insertions(+), 335 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/220ca960/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index a8df3f2..7efaa5a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -172,3 +172,6 @@
HDFS-8324. Add trace info to DFSClient#getErasureCodingZoneInfo(..) (vinayakumarb via
umamahesh)
+
+ HDFS-7672. Handle write failure for stripping blocks and refactor the
+ existing code in DFSStripedOutputStream and StripedDataStreamer. (szetszwo)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/220ca960/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 0280d71..8580357 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -24,6 +24,8 @@ import java.nio.channels.ClosedChannelException;
import java.util.EnumSet;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
@@ -86,6 +88,8 @@ import com.google.common.base.Preconditions;
@InterfaceAudience.Private
public class DFSOutputStream extends FSOutputSummer
implements Syncable, CanSetDropBehind {
+ static final Log LOG = LogFactory.getLog(DFSOutputStream.class);
+
/**
* Number of times to retry creating a file when there are transient
* errors (typically related to encryption zones and KeyProvider operations).
@@ -419,24 +423,35 @@ public class DFSOutputStream extends FSOutputSummer
streamer.incBytesCurBlock(len);
// If packet is full, enqueue it for transmission
- //
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
streamer.getBytesCurBlock() == blockSize) {
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
- currentPacket.getSeqno() +
- ", src=" + src +
- ", bytesCurBlock=" + streamer.getBytesCurBlock() +
- ", blockSize=" + blockSize +
- ", appendChunk=" + streamer.getAppendChunk());
- }
- streamer.waitAndQueuePacket(currentPacket);
- currentPacket = null;
+ enqueueCurrentPacketFull();
+ }
+ }
- adjustChunkBoundary();
+ void enqueueCurrentPacket() throws IOException {
+ streamer.waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ }
- endBlock();
+ void enqueueCurrentPacketFull() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("enqueue full " + currentPacket + ", src=" + src
+ + ", bytesCurBlock=" + streamer.getBytesCurBlock()
+ + ", blockSize=" + blockSize
+ + ", appendChunk=" + streamer.getAppendChunk()
+ + ", " + streamer);
}
+ enqueueCurrentPacket();
+ adjustChunkBoundary();
+ endBlock();
+ }
+
+ /** create an empty packet to mark the end of the block */
+ void setCurrentPacket2Empty() throws InterruptedIOException {
+ currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
+ streamer.getAndIncCurrentSeqno(), true);
+ currentPacket.setSyncBlock(shouldSyncBlock);
}
/**
@@ -444,7 +459,7 @@ public class DFSOutputStream extends FSOutputSummer
* write filled up its partial chunk. Tell the summer to generate full
* crc chunks from now on.
*/
- protected void adjustChunkBoundary() {
+ private void adjustChunkBoundary() {
if (streamer.getAppendChunk() &&
streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
streamer.setAppendChunk(false);
@@ -466,11 +481,8 @@ public class DFSOutputStream extends FSOutputSummer
*/
protected void endBlock() throws IOException {
if (streamer.getBytesCurBlock() == blockSize) {
- currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
- streamer.getAndIncCurrentSeqno(), true);
- currentPacket.setSyncBlock(shouldSyncBlock);
- streamer.waitAndQueuePacket(currentPacket);
- currentPacket = null;
+ setCurrentPacket2Empty();
+ enqueueCurrentPacket();
streamer.setBytesCurBlock(0);
lastFlushOffset = 0;
}
@@ -592,8 +604,7 @@ public class DFSOutputStream extends FSOutputSummer
}
if (currentPacket != null) {
currentPacket.setSyncBlock(isSync);
- streamer.waitAndQueuePacket(currentPacket);
- currentPacket = null;
+ enqueueCurrentPacket();
}
if (endBlock && streamer.getBytesCurBlock() > 0) {
// Need to end the current block, thus send an empty packet to
@@ -601,8 +612,7 @@ public class DFSOutputStream extends FSOutputSummer
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock || isSync);
- streamer.waitAndQueuePacket(currentPacket);
- currentPacket = null;
+ enqueueCurrentPacket();
streamer.setBytesCurBlock(0);
lastFlushOffset = 0;
} else {
@@ -779,15 +789,11 @@ public class DFSOutputStream extends FSOutputSummer
flushBuffer(); // flush from all upper layers
if (currentPacket != null) {
- streamer.waitAndQueuePacket(currentPacket);
- currentPacket = null;
+ enqueueCurrentPacket();
}
if (streamer.getBytesCurBlock() != 0) {
- // send an empty packet to mark the end of the block
- currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
- streamer.getAndIncCurrentSeqno(), true);
- currentPacket.setSyncBlock(shouldSyncBlock);
+ setCurrentPacket2Empty();
}
flushInternal(); // flush all data to Datanodes
@@ -901,4 +907,9 @@ public class DFSOutputStream extends FSOutputSummer
public long getFileId() {
return fileId;
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" + streamer;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/220ca960/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index 71cdbb9..bbc8ba0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -28,14 +28,16 @@ import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.util.DataChecksum;
@@ -44,6 +46,8 @@ import org.apache.htrace.Sampler;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
+import com.google.common.base.Preconditions;
+
/****************************************************************
* The DFSStripedOutputStream class supports writing files in striped
@@ -55,33 +59,154 @@ import org.apache.htrace.TraceScope;
@InterfaceAudience.Private
public class DFSStripedOutputStream extends DFSOutputStream {
+ /** Coordinate the communication between the streamers. */
+ static class Coordinator {
+ private final List<BlockingQueue<ExtendedBlock>> endBlocks;
+ private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
+ private volatile boolean shouldLocateFollowingBlock = false;
+
+ Coordinator(final int numDataBlocks, final int numAllBlocks) {
+ endBlocks = new ArrayList<>(numDataBlocks);
+ for (int i = 0; i < numDataBlocks; i++) {
+ endBlocks.add(new LinkedBlockingQueue<ExtendedBlock>(1));
+ }
- private final List<StripedDataStreamer> streamers;
- /**
- * Size of each striping cell, must be a multiple of bytesPerChecksum
- */
- private final ECInfo ecInfo;
- private final int cellSize;
- // checksum buffer, we only need to calculate checksum for parity blocks
- private byte[] checksumBuf;
- private ByteBuffer[] cellBuffers;
+ stripedBlocks = new ArrayList<>(numAllBlocks);
+ for (int i = 0; i < numAllBlocks; i++) {
+ stripedBlocks.add(new LinkedBlockingQueue<LocatedBlock>(1));
+ }
+ }
- private final short numAllBlocks;
- private final short numDataBlocks;
+ boolean shouldLocateFollowingBlock() {
+ return shouldLocateFollowingBlock;
+ }
- private int curIdx = 0;
- /* bytes written in current block group */
- //private long currentBlockGroupBytes = 0;
+ void putEndBlock(int i, ExtendedBlock block) {
+ shouldLocateFollowingBlock = true;
- //TODO: Use ErasureCoder interface (HDFS-7781)
- private RawErasureEncoder encoder;
+ final boolean b = endBlocks.get(i).offer(block);
+ Preconditions.checkState(b, "Failed to add " + block
+ + " to endBlocks queue, i=" + i);
+ }
- private StripedDataStreamer getLeadingStreamer() {
- return streamers.get(0);
+ ExtendedBlock getEndBlock(int i) throws InterruptedIOException {
+ try {
+ return endBlocks.get(i).poll(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw DFSUtil.toInterruptedIOException(
+ "getEndBlock interrupted, i=" + i, e);
+ }
+ }
+
+ void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
+ ExtendedBlock b = endBlocks.get(i).peek();
+ if (b == null) {
+ // streamer just has failed, put end block and continue
+ b = block;
+ putEndBlock(i, b);
+ }
+ b.setNumBytes(newBytes);
+ }
+
+ void putStripedBlock(int i, LocatedBlock block) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putStripedBlock " + block + ", i=" + i);
+ }
+ final boolean b = stripedBlocks.get(i).offer(block);
+ if (!b) {
+ throw new IOException("Failed: " + block + ", i=" + i);
+ }
+ }
+
+ LocatedBlock getStripedBlock(int i) throws IOException {
+ final LocatedBlock lb;
+ try {
+ lb = stripedBlocks.get(i).poll(90, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw DFSUtil.toInterruptedIOException("getStripedBlock interrupted", e);
+ }
+
+ if (lb == null) {
+ throw new IOException("Failed: i=" + i);
+ }
+ return lb;
+ }
}
- private long getBlockGroupSize() {
- return blockSize * numDataBlocks;
+ /** Buffers for writing the data and parity cells of a strip. */
+ class CellBuffers {
+ private final ByteBuffer[] buffers;
+ private final byte[][] checksumArrays;
+
+ CellBuffers(int numParityBlocks) throws InterruptedException{
+ if (cellSize % bytesPerChecksum != 0) {
+ throw new HadoopIllegalArgumentException("Invalid values: "
+ + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (="
+ + bytesPerChecksum + ") must divide cell size (=" + cellSize + ").");
+ }
+
+ checksumArrays = new byte[numParityBlocks][];
+ final int size = getChecksumSize() * (cellSize / bytesPerChecksum);
+ for (int i = 0; i < checksumArrays.length; i++) {
+ checksumArrays[i] = new byte[size];
+ }
+
+ buffers = new ByteBuffer[numAllBlocks];
+ for (int i = 0; i < buffers.length; i++) {
+ buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
+ }
+ }
+
+ private ByteBuffer[] getBuffers() {
+ return buffers;
+ }
+
+ byte[] getChecksumArray(int i) {
+ return checksumArrays[i - numDataBlocks];
+ }
+
+ private int addTo(int i, byte[] b, int off, int len) {
+ final ByteBuffer buf = buffers[i];
+ final int pos = buf.position() + len;
+ Preconditions.checkState(pos <= cellSize);
+ buf.put(b, off, len);
+ return pos;
+ }
+
+ private void clear() {
+ for (int i = 0; i< numAllBlocks; i++) {
+ buffers[i].clear();
+ if (i >= numDataBlocks) {
+ Arrays.fill(buffers[i].array(), (byte) 0);
+ }
+ }
+ }
+
+ private void release() {
+ for (int i = 0; i < numAllBlocks; i++) {
+ byteArrayManager.release(buffers[i].array());
+ }
+ }
+
+ private void flipDataBuffers() {
+ for (int i = 0; i < numDataBlocks; i++) {
+ buffers[i].flip();
+ }
+ }
+ }
+
+ private final Coordinator coordinator;
+ private final CellBuffers cellBuffers;
+ private final RawErasureEncoder encoder;
+ private final List<StripedDataStreamer> streamers;
+
+ /** Size of each striping cell, must be a multiple of bytesPerChecksum */
+ private final int cellSize;
+ private final int numAllBlocks;
+ private final int numDataBlocks;
+
+ private StripedDataStreamer getLeadingStreamer() {
+ return streamers.get(0);
}
/** Construct a new output stream for creating a file. */
@@ -90,82 +215,94 @@ public class DFSStripedOutputStream extends DFSOutputStream {
DataChecksum checksum, String[] favoredNodes)
throws IOException {
super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
- DFSClient.LOG.info("Creating striped output stream");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating DFSStripedOutputStream for " + src);
+ }
// ECInfo is restored from NN just before writing striped files.
- ecInfo = dfsClient.getErasureCodingInfo(src);
- cellSize = ecInfo.getSchema().getChunkSize();
- numAllBlocks = (short)(ecInfo.getSchema().getNumDataUnits()
- + ecInfo.getSchema().getNumParityUnits());
- numDataBlocks = (short)ecInfo.getSchema().getNumDataUnits();
+ //TODO reduce an rpc call HDFS-8289
+ final ECSchema schema = dfsClient.getErasureCodingInfo(src).getSchema();
+ final int numParityBlocks = schema.getNumParityUnits();
+ cellSize = schema.getChunkSize();
+ numDataBlocks = schema.getNumDataUnits();
+ numAllBlocks = numDataBlocks + numParityBlocks;
- checkConfiguration();
-
- checksumBuf = new byte[getChecksumSize() * (cellSize / bytesPerChecksum)];
- cellBuffers = new ByteBuffer[numAllBlocks];
- List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>();
+ encoder = new RSRawEncoder();
+ encoder.initialize(numDataBlocks, numParityBlocks, cellSize);
- for (int i = 0; i < numAllBlocks; i++) {
- stripeBlocks.add(new LinkedBlockingQueue<LocatedBlock>(numAllBlocks));
- try {
- cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
- } catch (InterruptedException ie) {
- final InterruptedIOException iioe = new InterruptedIOException(
- "create cell buffers");
- iioe.initCause(ie);
- throw iioe;
- }
+ coordinator = new Coordinator(numDataBlocks, numAllBlocks);
+ try {
+ cellBuffers = new CellBuffers(numParityBlocks);
+ } catch (InterruptedException ie) {
+ throw DFSUtil.toInterruptedIOException("Failed to create cell buffers", ie);
}
- encoder = new RSRawEncoder();
- encoder.initialize(numDataBlocks,
- numAllBlocks - numDataBlocks, cellSize);
List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
for (short i = 0; i < numAllBlocks; i++) {
- StripedDataStreamer streamer = new StripedDataStreamer(stat, null,
+ StripedDataStreamer streamer = new StripedDataStreamer(stat,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
- i, stripeBlocks, favoredNodes);
+ favoredNodes, i, coordinator);
s.add(streamer);
}
streamers = Collections.unmodifiableList(s);
+ setCurrentStreamer(0);
+ }
- refreshStreamer();
+ StripedDataStreamer getStripedDataStreamer(int i) {
+ return streamers.get(i);
}
- private void checkConfiguration() {
- if (cellSize % bytesPerChecksum != 0) {
- throw new HadoopIllegalArgumentException("Invalid values: "
- + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
- + ") must divide cell size (=" + cellSize + ").");
- }
+ int getCurrentIndex() {
+ return getCurrentStreamer().getIndex();
}
- private void refreshStreamer() {
- streamer = streamers.get(curIdx);
+ StripedDataStreamer getCurrentStreamer() {
+ return (StripedDataStreamer)streamer;
}
- private void moveToNextStreamer() {
- curIdx = (curIdx + 1) % numAllBlocks;
- refreshStreamer();
+ private StripedDataStreamer setCurrentStreamer(int i) {
+ streamer = streamers.get(i);
+ return getCurrentStreamer();
}
/**
- * encode the buffers.
- * After encoding, flip each buffer.
+ * Encode the buffers, i.e. compute parities.
*
* @param buffers data buffers + parity buffers
*/
- private void encode(ByteBuffer[] buffers) {
- ByteBuffer[] dataBuffers = new ByteBuffer[numDataBlocks];
- ByteBuffer[] parityBuffers = new ByteBuffer[numAllBlocks - numDataBlocks];
- for (int i = 0; i < numAllBlocks; i++) {
- if (i < numDataBlocks) {
- dataBuffers[i] = buffers[i];
- } else {
- parityBuffers[i - numDataBlocks] = buffers[i];
+ private static void encode(RawErasureEncoder encoder, int numData,
+ ByteBuffer[] buffers) {
+ final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
+ final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
+ System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);
+ System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length);
+
+ encoder.encode(dataBuffers, parityBuffers);
+ }
+
+
+ private void checkStreamers() throws IOException {
+ int count = 0;
+ for(StripedDataStreamer s : streamers) {
+ if (!s.isFailed()) {
+ count++;
}
}
- encoder.encode(dataBuffers, parityBuffers);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("checkStreamers: " + streamers);
+ LOG.debug("count=" + count);
+ }
+ if (count < numDataBlocks) {
+ throw new IOException("Failed: the number of remaining blocks = "
+ + count + " < the number of data blocks = " + numDataBlocks);
+ }
+ }
+
+ private void handleStreamerFailure(String err, Exception e) throws IOException {
+ LOG.warn("Failed: " + err + ", " + this, e);
+ getCurrentStreamer().setIsFailed(true);
+ checkStreamers();
+ currentPacket = null;
}
/**
@@ -173,11 +310,12 @@ public class DFSStripedOutputStream extends DFSOutputStream {
* writing parity blocks.
*
* @param byteBuffer the given buffer to generate packets
+ * @param checksumBuf the checksum buffer
* @return packets generated
* @throws IOException
*/
- private List<DFSPacket> generatePackets(ByteBuffer byteBuffer)
- throws IOException{
+ private List<DFSPacket> generatePackets(
+ ByteBuffer byteBuffer, byte[] checksumBuf) throws IOException{
List<DFSPacket> packets = new ArrayList<>();
assert byteBuffer.hasArray();
getDataChecksum().calculateChunkedSums(byteBuffer.array(), 0,
@@ -201,82 +339,47 @@ public class DFSStripedOutputStream extends DFSOutputStream {
}
@Override
- protected synchronized void writeChunk(byte[] b, int offset, int len,
+ protected synchronized void writeChunk(byte[] bytes, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
- super.writeChunk(b, offset, len, checksum, ckoff, cklen);
-
- if (getSizeOfCellnBuffer(curIdx) <= cellSize) {
- addToCellBuffer(b, offset, len);
- } else {
- String msg = "Writing a chunk should not overflow the cell buffer.";
- DFSClient.LOG.info(msg);
- throw new IOException(msg);
- }
-
- // If current packet has not been enqueued for transmission,
- // but the cell buffer is full, we need to enqueue the packet
- if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) {
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("DFSClient writeChunk cell buffer full seqno=" +
- currentPacket.getSeqno() +
- ", curIdx=" + curIdx +
- ", src=" + src +
- ", bytesCurBlock=" + streamer.getBytesCurBlock() +
- ", blockSize=" + blockSize +
- ", appendChunk=" + streamer.getAppendChunk());
+ final int index = getCurrentIndex();
+ final StripedDataStreamer current = getCurrentStreamer();
+ final int pos = cellBuffers.addTo(index, bytes, offset, len);
+ final boolean cellFull = pos == cellSize;
+
+ final long oldBytes = current.getBytesCurBlock();
+ if (!current.isFailed()) {
+ try {
+ super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
+
+ // cell is full and current packet has not been enqueued,
+ if (cellFull && currentPacket != null) {
+ enqueueCurrentPacketFull();
+ }
+ } catch(Exception e) {
+ handleStreamerFailure("offset=" + offset + ", length=" + len, e);
}
- streamer.waitAndQueuePacket(currentPacket);
- currentPacket = null;
- adjustChunkBoundary();
- endBlock();
+ }
+
+ if (current.isFailed()) {
+ final long newBytes = oldBytes + len;
+ coordinator.setBytesEndBlock(index, newBytes, current.getBlock());
+ current.setBytesCurBlock(newBytes);
}
// Two extra steps are needed when a striping cell is full:
// 1. Forward the current index pointer
// 2. Generate parity packets if a full stripe of data cells are present
- if (getSizeOfCellnBuffer(curIdx) == cellSize) {
- //move curIdx to next cell
- moveToNextStreamer();
+ if (cellFull) {
+ int next = index + 1;
//When all data cells in a stripe are ready, we need to encode
//them and generate some parity cells. These cells will be
//converted to packets and put to their DataStreamer's queue.
- if (curIdx == numDataBlocks) {
- //encode the data cells
- for (int k = 0; k < numDataBlocks; k++) {
- cellBuffers[k].flip();
- }
- encode(cellBuffers);
- for (int i = numDataBlocks; i < numAllBlocks; i++) {
- ByteBuffer parityBuffer = cellBuffers[i];
- List<DFSPacket> packets = generatePackets(parityBuffer);
- for (DFSPacket p : packets) {
- currentPacket = p;
- streamer.waitAndQueuePacket(currentPacket);
- currentPacket = null;
- }
- endBlock();
- moveToNextStreamer();
- }
- //read next stripe to cellBuffers
- clearCellBuffers();
- }
- }
- }
-
- private void addToCellBuffer(byte[] b, int off, int len) {
- cellBuffers[curIdx].put(b, off, len);
- }
-
- private int getSizeOfCellnBuffer(int cellIndex) {
- return cellBuffers[cellIndex].position();
- }
-
- private void clearCellBuffers() {
- for (int i = 0; i< numAllBlocks; i++) {
- cellBuffers[i].clear();
- if (i >= numDataBlocks) {
- Arrays.fill(cellBuffers[i].array(), (byte) 0);
+ if (next == numDataBlocks) {
+ cellBuffers.flipDataBuffers();
+ writeParityCells();
+ next = 0;
}
+ setCurrentStreamer(next);
}
}
@@ -284,20 +387,14 @@ public class DFSStripedOutputStream extends DFSOutputStream {
return numDataBlocks * cellSize;
}
- private void notSupported(String headMsg)
- throws IOException{
- throw new IOException(
- headMsg + " is now not supported for striping layout.");
- }
-
@Override
- public void hflush() throws IOException {
- notSupported("hflush");
+ public void hflush() {
+ throw new UnsupportedOperationException();
}
@Override
- public void hsync() throws IOException {
- notSupported("hsync");
+ public void hsync() {
+ throw new UnsupportedOperationException();
}
@Override
@@ -327,29 +424,28 @@ public class DFSStripedOutputStream extends DFSOutputStream {
return closed || getLeadingStreamer().streamerClosed();
}
- // shutdown datastreamer and responseprocessor threads.
- // interrupt datastreamer if force is true
@Override
protected void closeThreads(boolean force) throws IOException {
- int index = 0;
- boolean exceptionOccurred = false;
+ final MultipleIOException.Builder b = new MultipleIOException.Builder();
for (StripedDataStreamer streamer : streamers) {
try {
streamer.close(force);
streamer.join();
streamer.closeSocket();
- } catch (InterruptedException | IOException e) {
- DFSClient.LOG.error("Failed to shutdown streamer: name="
- + streamer.getName() + ", index=" + index + ", file=" + src, e);
- exceptionOccurred = true;
+ } catch(Exception e) {
+ try {
+ handleStreamerFailure("force=" + force, e);
+ } catch(IOException ioe) {
+ b.add(ioe);
+ }
} finally {
streamer.setSocketToNull();
setClosed();
- index++;
}
}
- if (exceptionOccurred) {
- throw new IOException("Failed to shutdown streamer");
+ final IOException ioe = b.build();
+ if (ioe != null) {
+ throw ioe;
}
}
@@ -370,50 +466,69 @@ public class DFSStripedOutputStream extends DFSOutputStream {
if (currentBlockGroupBytes % stripeDataSize() == 0) {
return;
}
- long firstCellSize = getLeadingStreamer().getBytesCurBlock() % cellSize;
- long parityCellSize = firstCellSize > 0 && firstCellSize < cellSize ?
+
+ final int firstCellSize = (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize);
+ final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize?
firstCellSize : cellSize;
+ final ByteBuffer[] buffers = cellBuffers.getBuffers();
for (int i = 0; i < numAllBlocks; i++) {
// Pad zero bytes to make all cells exactly the size of parityCellSize
// If internal block is smaller than parity block, pad zero bytes.
// Also pad zero bytes to all parity cells
- int position = cellBuffers[i].position();
+ final int position = buffers[i].position();
assert position <= parityCellSize : "If an internal block is smaller" +
" than parity block, then its last cell should be small than last" +
" parity cell";
for (int j = 0; j < parityCellSize - position; j++) {
- cellBuffers[i].put((byte) 0);
+ buffers[i].put((byte) 0);
}
- cellBuffers[i].flip();
+ buffers[i].flip();
}
- encode(cellBuffers);
- // write parity cells
- curIdx = numDataBlocks;
- refreshStreamer();
+ writeParityCells();
+ }
+
+ void writeParityCells() throws IOException {
+ final ByteBuffer[] buffers = cellBuffers.getBuffers();
+ //encode the data cells
+ encode(encoder, numDataBlocks, buffers);
for (int i = numDataBlocks; i < numAllBlocks; i++) {
- ByteBuffer parityBuffer = cellBuffers[i];
- List<DFSPacket> packets = generatePackets(parityBuffer);
- for (DFSPacket p : packets) {
- currentPacket = p;
- streamer.waitAndQueuePacket(currentPacket);
- currentPacket = null;
+ writeParity(i, buffers[i], cellBuffers.getChecksumArray(i));
+ }
+ cellBuffers.clear();
+ }
+
+ void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf
+ ) throws IOException {
+ final StripedDataStreamer current = setCurrentStreamer(index);
+ final int len = buffer.limit();
+
+ final long oldBytes = current.getBytesCurBlock();
+ if (!current.isFailed()) {
+ try {
+ for (DFSPacket p : generatePackets(buffer, checksumBuf)) {
+ streamer.waitAndQueuePacket(p);
+ }
+ endBlock();
+ } catch(Exception e) {
+ handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
}
- endBlock();
- moveToNextStreamer();
}
- clearCellBuffers();
+ if (current.isFailed()) {
+ final long newBytes = oldBytes + len;
+ current.setBytesCurBlock(newBytes);
+ }
}
@Override
void setClosed() {
super.setClosed();
for (int i = 0; i < numAllBlocks; i++) {
- byteArrayManager.release(cellBuffers[i].array());
streamers.get(i).release();
}
+ cellBuffers.release();
}
@Override
@@ -425,25 +540,31 @@ public class DFSStripedOutputStream extends DFSOutputStream {
try {
// flush from all upper layers
- flushBuffer();
- if (currentPacket != null) {
- streamer.waitAndQueuePacket(currentPacket);
- currentPacket = null;
+ try {
+ flushBuffer();
+ if (currentPacket != null) {
+ enqueueCurrentPacket();
+ }
+ } catch(Exception e) {
+ handleStreamerFailure("closeImpl", e);
}
+
// if the last stripe is incomplete, generate and write parity cells
writeParityCellsForLastStripe();
for (int i = 0; i < numAllBlocks; i++) {
- curIdx = i;
- refreshStreamer();
- if (streamer.getBytesCurBlock() > 0) {
- // send an empty packet to mark the end of the block
- currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
- streamer.getAndIncCurrentSeqno(), true);
- currentPacket.setSyncBlock(shouldSyncBlock);
+ final StripedDataStreamer s = setCurrentStreamer(i);
+ if (!s.isFailed()) {
+ try {
+ if (s.getBytesCurBlock() > 0) {
+ setCurrentPacket2Empty();
+ }
+ // flush all data to Datanode
+ flushInternal();
+ } catch(Exception e) {
+ handleStreamerFailure("closeImpl", e);
+ }
}
- // flush all data to Datanode
- flushInternal();
}
closeThreads(false);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/220ca960/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index cae56c0..2e2ecfd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -36,6 +36,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PAS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
@@ -55,7 +56,6 @@ import java.util.concurrent.ThreadLocalRandom;
import javax.net.SocketFactory;
-import com.google.common.collect.Sets;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Option;
@@ -96,6 +96,7 @@ import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.google.protobuf.BlockingService;
@InterfaceAudience.Private
@@ -1513,7 +1514,7 @@ public class DFSUtil {
public static int getSmallBufferSize(Configuration conf) {
return Math.min(getIoFileBufferSize(conf) / 2, 512);
}
-
+
/**
* Probe for HDFS Encryption being enabled; this uses the value of
* the option {@link DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI},
@@ -1527,4 +1528,10 @@ public class DFSUtil {
DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty();
}
+ public static InterruptedIOException toInterruptedIOException(String message,
+ InterruptedException e) {
+ final InterruptedIOException iioe = new InterruptedIOException(message);
+ iioe.initCause(e);
+ return iioe;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/220ca960/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 631f386..8f07341 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -575,7 +575,7 @@ class DataStreamer extends Daemon {
// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Allocating new block");
+ LOG.debug("Allocating new block " + this);
}
setPipeline(nextBlockOutputStream());
initDataStreaming();
@@ -593,10 +593,7 @@ class DataStreamer extends Daemon {
long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock > stat.getBlockSize()) {
throw new IOException("BlockSize " + stat.getBlockSize() +
- " is smaller than data size. " +
- " Offset of packet in block " +
- lastByteOffsetInBlock +
- " Aborting file " + src);
+ " < lastByteOffsetInBlock, " + this + ", " + one);
}
if (one.isLastPacketInBlock()) {
@@ -1751,7 +1748,7 @@ class DataStreamer extends Daemon {
dataQueue.addLast(packet);
lastQueuedSeqno = packet.getSeqno();
if (LOG.isDebugEnabled()) {
- LOG.debug("Queued packet " + packet.getSeqno());
+ LOG.debug("Queued " + packet + ", " + this);
}
dataQueue.notifyAll();
}
@@ -1901,4 +1898,10 @@ class DataStreamer extends Daemon {
s.close();
}
}
+
+ @Override
+ public String toString() {
+ return (block == null? null: block.getLocalBlock())
+ + "@" + Arrays.toString(getNodes());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/220ca960/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index ef7e2a6..258fc65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -18,8 +18,14 @@
package org.apache.hadoop.hdfs;
-import java.util.List;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -31,15 +37,6 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
-
/****************************************************************************
* The StripedDataStreamer class is used by {@link DFSStripedOutputStream}.
* There are two kinds of StripedDataStreamer, leading streamer and ordinary
@@ -49,40 +46,32 @@ import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
*
****************************************************************************/
public class StripedDataStreamer extends DataStreamer {
- private final short index;
- private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
- private boolean hasCommittedBlock = false;
+ private final Coordinator coordinator;
+ private final int index;
+ private volatile boolean isFailed;
- StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block,
+ StripedDataStreamer(HdfsFileStatus stat,
DFSClient dfsClient, String src,
Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy,
- ByteArrayManager byteArrayManage, short index,
- List<BlockingQueue<LocatedBlock>> stripedBlocks,
- String[] favoredNodes) {
- super(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
+ ByteArrayManager byteArrayManage, String[] favoredNodes,
+ short index, Coordinator coordinator) {
+ super(stat, null, dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage, favoredNodes);
this.index = index;
- this.stripedBlocks = stripedBlocks;
+ this.coordinator = coordinator;
}
- /**
- * Construct a data streamer for appending to the last partial block
- * @param lastBlock last block of the file to be appended
- * @param stat status of the file to be appended
- * @throws IOException if error occurs
- */
- StripedDataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
- DFSClient dfsClient, String src,
- Progressable progress, DataChecksum checksum,
- AtomicReference<CachingStrategy> cachingStrategy,
- ByteArrayManager byteArrayManage, short index,
- List<BlockingQueue<LocatedBlock>> stripedBlocks)
- throws IOException {
- super(lastBlock, stat, dfsClient, src, progress, checksum, cachingStrategy,
- byteArrayManage);
- this.index = index;
- this.stripedBlocks = stripedBlocks;
+ int getIndex() {
+ return index;
+ }
+
+ void setIsFailed(boolean isFailed) {
+ this.isFailed = isFailed;
+ }
+
+ boolean isFailed() {
+ return isFailed;
}
public boolean isLeadingStreamer () {
@@ -95,18 +84,8 @@ public class StripedDataStreamer extends DataStreamer {
@Override
protected void endBlock() {
- if (!isLeadingStreamer() && !isParityStreamer()) {
- // before retrieving a new block, transfer the finished block to
- // leading streamer
- LocatedBlock finishedBlock = new LocatedBlock(
- new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(),
- block.getNumBytes(), block.getGenerationStamp()), null);
- try {
- boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
- TimeUnit.SECONDS);
- } catch (InterruptedException ie) {
- // TODO: Handle InterruptedException (HDFS-7786)
- }
+ if (!isParityStreamer()) {
+ coordinator.putEndBlock(index, block);
}
super.endBlock();
}
@@ -114,71 +93,40 @@ public class StripedDataStreamer extends DataStreamer {
@Override
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException {
- LocatedBlock lb = null;
if (isLeadingStreamer()) {
- if (hasCommittedBlock) {
- /**
- * when committing a block group, leading streamer has to adjust
- * {@link block} to include the size of block group
- */
- for (int i = 1; i < NUM_DATA_BLOCKS; i++) {
- try {
- LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
- TimeUnit.SECONDS);
- if (finishedLocatedBlock == null) {
- throw new IOException("Fail to get finished LocatedBlock " +
- "from streamer, i=" + i);
- }
- ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
- long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
- if (block != null) {
- block.setNumBytes(block.getNumBytes() + bytes);
- }
- } catch (InterruptedException ie) {
- DFSClient.LOG.info("InterruptedException received when putting" +
- " a block to stripeBlocks, ie = " + ie);
- }
+ if (coordinator.shouldLocateFollowingBlock()) {
+ // set numByte for the previous block group
+ long bytes = 0;
+ for (int i = 0; i < NUM_DATA_BLOCKS; i++) {
+ final ExtendedBlock b = coordinator.getEndBlock(i);
+ bytes += b == null ? 0 : b.getNumBytes();
}
+ block.setNumBytes(bytes);
}
- lb = super.locateFollowingBlock(excludedNodes);
- hasCommittedBlock = true;
- assert lb instanceof LocatedStripedBlock;
- DFSClient.LOG.debug("Leading streamer obtained bg " + lb);
- LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
- (LocatedStripedBlock) lb, BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS,
- NUM_PARITY_BLOCKS);
+ final LocatedStripedBlock lsb
+ = (LocatedStripedBlock)super.locateFollowingBlock(excludedNodes);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Obtained block group " + lsb);
+ }
+ LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(lsb,
+ BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
+
assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) :
"Fail to get block group from namenode: blockGroupSize: " +
(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " +
blocks.length;
- lb = blocks[0];
- for (int i = 1; i < blocks.length; i++) {
- try {
- boolean offSuccess = stripedBlocks.get(i).offer(blocks[i],
- 90, TimeUnit.SECONDS);
- if(!offSuccess){
- String msg = "Fail to put block to stripeBlocks. i = " + i;
- DFSClient.LOG.info(msg);
- throw new IOException(msg);
- } else {
- DFSClient.LOG.info("Allocate a new block to a streamer. i = " + i
- + ", block: " + blocks[i]);
- }
- } catch (InterruptedException ie) {
- DFSClient.LOG.info("InterruptedException received when putting" +
- " a block to stripeBlocks, ie = " + ie);
- }
- }
- } else {
- try {
- // wait 90 seconds to get a block from the queue
- lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS);
- } catch (InterruptedException ie) {
- DFSClient.LOG.info("InterruptedException received when retrieving " +
- "a block from stripeBlocks, ie = " + ie);
+ for (int i = 0; i < blocks.length; i++) {
+ coordinator.putStripedBlock(i, blocks[i]);
}
}
- return lb;
+
+ return coordinator.getStripedBlock(index);
+ }
+
+ @Override
+ public String toString() {
+ return "#" + index + ": isFailed? " + Boolean.toString(isFailed).charAt(0)
+ + ", " + super.toString();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/220ca960/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 7392552..8f843d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -390,7 +390,7 @@ public class FSDirectory implements Closeable {
void disableQuotaChecks() {
skipQuotaCheck = true;
}
-
+
/**
* This is a wrapper for resolvePath(). If the path passed
* is prefixed with /.reserved/raw, then it checks to ensure that the caller
http://git-wip-us.apache.org/repos/asf/hadoop/blob/220ca960/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index fdbacdc..4ec9bf9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1998,8 +1998,6 @@ public class MiniDFSCluster {
int node = -1;
for (int i = 0; i < dataNodes.size(); i++) {
DataNode dn = dataNodes.get(i).datanode;
- LOG.info("DN name=" + dnName + " found DN=" + dn +
- " with name=" + dn.getDisplayName());
if (dnName.equals(dn.getDatanodeId().getXferAddr())) {
node = i;
break;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/220ca960/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index 5ce94ee..ec98e68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -42,6 +44,12 @@ import org.junit.Test;
public class TestDFSStripedOutputStream {
public static final Log LOG = LogFactory.getLog(TestDFSStripedOutputStream.class);
+
+ static {
+ GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
+ }
+
private int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
@@ -245,6 +253,11 @@ public class TestDFSStripedOutputStream {
static void verifyParity(final long size, final int cellSize,
byte[][] dataBytes, byte[][] parityBytes) {
+ verifyParity(size, cellSize, dataBytes, parityBytes, -1);
+ }
+
+ static void verifyParity(final long size, final int cellSize,
+ byte[][] dataBytes, byte[][] parityBytes, int killedDnIndex) {
// verify the parity blocks
int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
size, cellSize, dataBytes.length, dataBytes.length);
@@ -265,7 +278,10 @@ public class TestDFSStripedOutputStream {
encoder.initialize(dataBytes.length, parityBytes.length, cellSize);
encoder.encode(dataBytes, expectedParityBytes);
for (int i = 0; i < parityBytes.length; i++) {
- Assert.assertArrayEquals(expectedParityBytes[i], parityBytes[i]);
+ if (i != killedDnIndex) {
+ Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + killedDnIndex,
+ expectedParityBytes[i], parityBytes[i]);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/220ca960/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
new file mode 100644
index 0000000..c2e588a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDFSStripedOutputStreamWithFailure {
+ public static final Log LOG = LogFactory.getLog(
+ TestDFSStripedOutputStreamWithFailure.class);
+ static {
+ GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
+ }
+
+ private static final int NUM_DATA_BLOCKS = HdfsConstants.NUM_DATA_BLOCKS;
+ private static final int NUM_PARITY_BLOCKS = HdfsConstants.NUM_PARITY_BLOCKS;
+ private static final int CELL_SIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ private static final int STRIPES_PER_BLOCK = 4;
+ private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
+ private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS;
+
+ private final HdfsConfiguration conf = new HdfsConfiguration();
+ private MiniDFSCluster cluster;
+ private DistributedFileSystem dfs;
+ private final Path dir = new Path("/"
+ + TestDFSStripedOutputStreamWithFailure.class.getSimpleName());
+
+
+ @Before
+ public void setup() throws IOException {
+ final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+ cluster.waitActive();
+ dfs = cluster.getFileSystem();
+ dfs.mkdirs(dir);
+ dfs.createErasureCodingZone(dir, null);
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private static byte getByte(long pos) {
+ return (byte)pos;
+ }
+
+ @Test(timeout=120000)
+ public void testDatanodeFailure1() {
+ final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+ final int dn = 1;
+ runTest("file" + dn, length, dn);
+ }
+
+ @Test(timeout=120000)
+ public void testDatanodeFailure2() {
+ final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+ final int dn = 2;
+ runTest("file" + dn, length, dn);
+ }
+
+ @Test(timeout=120000)
+ public void testDatanodeFailure3() {
+ final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+ final int dn = 3;
+ runTest("file" + dn, length, dn);
+ }
+
+ @Test(timeout=120000)
+ public void testDatanodeFailure4() {
+ final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+ final int dn = 4;
+ runTest("file" + dn, length, dn);
+ }
+
+ @Test(timeout=120000)
+ public void testDatanodeFailure5() {
+ final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+ final int dn = 5;
+ runTest("file" + dn, length, dn);
+ }
+
+ @Test(timeout=120000)
+ public void testDatanodeFailure6() {
+ final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+ final int dn = 6;
+ runTest("file" + dn, length, dn);
+ }
+
+ @Test(timeout=120000)
+ public void testDatanodeFailure7() {
+ final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+ final int dn = 7;
+ runTest("file" + dn, length, dn);
+ }
+
+ @Test(timeout=120000)
+ public void testDatanodeFailure8() {
+ final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+ final int dn = 8;
+ runTest("file" + dn, length, dn);
+ }
+
+ private void runTest(final String src, final int length, final int dnIndex) {
+ try {
+ cluster.startDataNodes(conf, 1, true, null, null);
+ cluster.waitActive();
+
+ runTest(new Path(dir, src), length, dnIndex);
+ } catch(Exception e) {
+ LOG.info("FAILED", e);
+ Assert.fail(StringUtils.stringifyException(e));
+ }
+ }
+
+ private void runTest(final Path p, final int length,
+ final int dnIndex) throws Exception {
+ LOG.info("p=" + p + ", length=" + length + ", dnIndex=" + dnIndex);
+ final String fullPath = p.toString();
+
+ final AtomicInteger pos = new AtomicInteger();
+ final FSDataOutputStream out = dfs.create(p);
+ final AtomicBoolean killed = new AtomicBoolean();
+ final Thread killer = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ killDatanode(cluster, (DFSStripedOutputStream)out.getWrappedStream(),
+ dnIndex, pos);
+ killed.set(true);
+ }
+ });
+ killer.start();
+
+ final int mask = (1 << 16) - 1;
+ for(; pos.get() < length; ) {
+ final int i = pos.getAndIncrement();
+ write(out, i);
+ if ((i & mask) == 0) {
+ final long ms = 100;
+ LOG.info("i=" + i + " sleep " + ms);
+ Thread.sleep(ms);
+ }
+ }
+ killer.join(10000);
+ Assert.assertTrue(killed.get());
+ out.close();
+
+ // check file length
+ final FileStatus status = dfs.getFileStatus(p);
+ Assert.assertEquals(length, status.getLen());
+
+ checkData(dfs, fullPath, length, dnIndex);
+ }
+
+ static void write(FSDataOutputStream out, int i) throws IOException {
+ try {
+ out.write(getByte(i));
+ } catch(IOException ioe) {
+ throw new IOException("Failed at i=" + i, ioe);
+ }
+ }
+
+ static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
+ for(;;) {
+ final DatanodeInfo[] datanodes = streamer.getNodes();
+ if (datanodes != null) {
+ Assert.assertEquals(1, datanodes.length);
+ Assert.assertNotNull(datanodes[0]);
+ return datanodes[0];
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ return null;
+ }
+ }
+ }
+
+ static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out,
+ final int dnIndex, final AtomicInteger pos) {
+ final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
+ final DatanodeInfo datanode = getDatanodes(s);
+ LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
+ cluster.stopDataNode(datanode.getXferAddr());
+ }
+
+ static void checkData(DistributedFileSystem dfs, String src, int length,
+ int killedDnIndex) throws IOException {
+ List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
+ LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(src, 0L);
+ final int expectedNumGroup = (length - 1)/BLOCK_GROUP_SIZE + 1;
+ Assert.assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size());
+
+ for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
+ Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
+ LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
+ (LocatedStripedBlock) firstBlock,
+ CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
+ blockGroupList.add(Arrays.asList(blocks));
+ }
+
+ // test each block group
+ for (int group = 0; group < blockGroupList.size(); group++) {
+ final boolean isLastGroup = group == blockGroupList.size() - 1;
+ final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE
+ : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE;
+ final int numCellInGroup = (int)((groupSize - 1)/CELL_SIZE + 1);
+ final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS;
+ final int lastCellSize = groupSize - (numCellInGroup - 1)*CELL_SIZE;
+
+ //get the data of this block
+ List<LocatedBlock> blockList = blockGroupList.get(group);
+ byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][];
+ byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][];
+
+ // for each block, use BlockReader to read data
+ for (int i = 0; i < blockList.size(); i++) {
+ final int j = i >= NUM_DATA_BLOCKS? 0: i;
+ final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS
+ + (j <= lastCellIndex? 1: 0);
+ final int blockSize = numCellInBlock*CELL_SIZE
+ + (isLastGroup && i == lastCellIndex? lastCellSize - CELL_SIZE: 0);
+
+ final byte[] blockBytes = new byte[blockSize];
+ if (i < NUM_DATA_BLOCKS) {
+ dataBlockBytes[i] = blockBytes;
+ } else {
+ parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes;
+ }
+
+ final LocatedBlock lb = blockList.get(i);
+ LOG.info("XXX i=" + i + ", lb=" + lb);
+ if (lb == null) {
+ continue;
+ }
+ final ExtendedBlock block = lb.getBlock();
+ Assert.assertEquals(blockSize, block.getNumBytes());
+
+
+ if (block.getNumBytes() == 0) {
+ continue;
+ }
+
+ if (i != killedDnIndex) {
+ final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
+ dfs, lb, 0, block.getNumBytes());
+ blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
+ blockReader.close();
+ }
+ }
+
+ // check data
+ final int groupPosInFile = group*BLOCK_GROUP_SIZE;
+ for (int i = 0; i < dataBlockBytes.length; i++) {
+ final byte[] actual = dataBlockBytes[i];
+ for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) {
+ final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(
+ CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile;
+ Assert.assertTrue(posInFile < length);
+ final byte expected = getByte(posInFile);
+
+ if (i == killedDnIndex) {
+ actual[posInBlk] = expected;
+ } else {
+ String s = "expected=" + expected + " but actual=" + actual[posInBlk]
+ + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk
+ + ". group=" + group + ", i=" + i;
+ Assert.assertEquals(s, expected, actual[posInBlk]);
+ }
+ }
+ }
+
+ // check parity
+ TestDFSStripedOutputStream.verifyParity(
+ lbs.getLocatedBlocks().get(group).getBlockSize(),
+ CELL_SIZE, dataBlockBytes, parityBlockBytes,
+ killedDnIndex - dataBlockBytes.length);
+ }
+ }
+}