You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/07/14 22:11:42 UTC
hadoop git commit: HDFS-8734. Erasure Coding: fix one cell need two
packets. Contributed by Walter Su.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7285 6ff957be8 -> 0a93712f3
HDFS-8734. Erasure Coding: fix one cell need two packets. Contributed by Walter Su.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0a93712f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0a93712f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0a93712f
Branch: refs/heads/HDFS-7285
Commit: 0a93712f3b9b36d746577dca5da0f7f09756fcca
Parents: 6ff957b
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Jul 14 13:10:51 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Jul 14 13:10:51 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 2 +-
.../java/org/apache/hadoop/hdfs/DFSPacket.java | 6 +-
.../hadoop/hdfs/DFSStripedOutputStream.java | 85 ++++++++------------
4 files changed, 41 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a93712f/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 2b91295..93c3162 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -350,3 +350,6 @@
HDFS-8702. Erasure coding: update BlockManager.blockHasEnoughRacks(..) logic
for striped block. (Kai Sasaki via jing9)
+
+ HDFS-8734. Erasure Coding: fix one cell need two packets. (Walter Su via
+ jing9)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a93712f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index f41044b..9e201ad 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -419,7 +419,7 @@ public class DFSOutputStream extends FSOutputSummer
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
- currentPacket.incNumChunks(1);
+ currentPacket.incNumChunks();
streamer.incBytesCurBlock(len);
// If packet is full, enqueue it for transmission
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a93712f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
index 2698de3..a26e35e 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@ -259,10 +259,10 @@ public class DFSPacket {
}
/**
- * increase the number of chunks by n
+ * increase the number of chunks by one
*/
- synchronized void incNumChunks(int n) {
- numChunks += n;
+ synchronized void incNumChunks() {
+ numChunks++;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a93712f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index e6de714..a4bb49d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -254,6 +254,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
private final CellBuffers cellBuffers;
private final RawErasureEncoder encoder;
private final List<StripedDataStreamer> streamers;
+ private final DFSPacket[] currentPackets; // current Packet of each streamer
/** Size of each striping cell, must be a multiple of bytesPerChecksum */
private final int cellSize;
@@ -301,6 +302,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
s.add(streamer);
}
streamers = Collections.unmodifiableList(s);
+ currentPackets = new DFSPacket[streamers.size()];
setCurrentStreamer(0);
}
@@ -316,9 +318,18 @@ public class DFSStripedOutputStream extends DFSOutputStream {
return (StripedDataStreamer)streamer;
}
- private synchronized StripedDataStreamer setCurrentStreamer(int i) {
- streamer = streamers.get(i);
+ private synchronized StripedDataStreamer setCurrentStreamer(int newIdx)
+ throws IOException {
+ // backup currentPacket for current streamer
+ int oldIdx = streamers.indexOf(streamer);
+ if (oldIdx >= 0) {
+ currentPackets[oldIdx] = currentPacket;
+ }
+
+ streamer = streamers.get(newIdx);
+ currentPacket = currentPackets[newIdx];
adjustChunkBoundary();
+
return getCurrentStreamer();
}
@@ -366,41 +377,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
currentPacket = null;
}
- /**
- * Generate packets from a given buffer. This is only used for streamers
- * writing parity blocks.
- *
- * @param byteBuffer the given buffer to generate packets
- * @param checksumBuf the checksum buffer
- * @return packets generated
- * @throws IOException
- */
- private List<DFSPacket> generatePackets(
- ByteBuffer byteBuffer, byte[] checksumBuf) throws IOException{
- List<DFSPacket> packets = new ArrayList<>();
- assert byteBuffer.hasArray();
- getDataChecksum().calculateChunkedSums(byteBuffer.array(), 0,
- byteBuffer.remaining(), checksumBuf, 0);
- int ckOff = 0;
- while (byteBuffer.remaining() > 0) {
- DFSPacket p = createPacket(packetSize, chunksPerPacket,
- getCurrentStreamer().getBytesCurBlock(),
- getCurrentStreamer().getAndIncCurrentSeqno(), false);
- int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum;
- int toWrite = byteBuffer.remaining() > maxBytesToPacket ?
- maxBytesToPacket: byteBuffer.remaining();
- int chunks = (toWrite - 1) / bytesPerChecksum + 1;
- int ckLen = chunks * getChecksumSize();
- p.writeChecksum(checksumBuf, ckOff, ckLen);
- ckOff += ckLen;
- p.writeData(byteBuffer, toWrite);
- getCurrentStreamer().incBytesCurBlock(toWrite);
- p.incNumChunks(chunks);
- packets.add(p);
- }
- return packets;
- }
-
@Override
protected synchronized void writeChunk(byte[] bytes, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
@@ -413,11 +389,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
if (!current.isFailed()) {
try {
super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
-
- // cell is full and current packet has not been enqueued,
- if (cellFull && currentPacket != null) {
- enqueueCurrentPacketFull();
- }
} catch(Exception e) {
handleStreamerFailure("offset=" + offset + ", length=" + len, e);
}
@@ -581,10 +552,14 @@ public class DFSStripedOutputStream extends DFSOutputStream {
final long oldBytes = current.getBytesCurBlock();
if (!current.isFailed()) {
try {
- for (DFSPacket p : generatePackets(buffer, checksumBuf)) {
- getCurrentStreamer().waitAndQueuePacket(p);
+ DataChecksum sum = getDataChecksum();
+ sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
+ for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
+ int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
+ int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
+ super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset,
+ getChecksumSize());
}
- endBlock();
} catch(Exception e) {
handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
}
@@ -628,16 +603,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
// flush from all upper layers
try {
flushBuffer();
- if (currentPacket != null) {
- enqueueCurrentPacket();
- }
+ // if the last stripe is incomplete, generate and write parity cells
+ writeParityCellsForLastStripe();
+ enqueueAllCurrentPackets();
} catch(Exception e) {
handleStreamerFailure("closeImpl", e);
}
- // if the last stripe is incomplete, generate and write parity cells
- writeParityCellsForLastStripe();
-
for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i);
if (!s.isFailed()) {
@@ -667,4 +639,15 @@ public class DFSStripedOutputStream extends DFSOutputStream {
setClosed();
}
}
+
+ private void enqueueAllCurrentPackets() throws IOException {
+ int idx = streamers.indexOf(getCurrentStreamer());
+ for(int i = 0; i < streamers.size(); i++) {
+ setCurrentStreamer(i);
+ if (currentPacket != null) {
+ enqueueCurrentPacket();
+ }
+ }
+ setCurrentStreamer(idx);
+ }
}