You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/07/14 22:11:42 UTC

hadoop git commit: HDFS-8734. Erasure Coding: fix one cell need two packets. Contributed by Walter Su.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7285 6ff957be8 -> 0a93712f3


HDFS-8734. Erasure Coding: fix one cell need two packets. Contributed by Walter Su.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0a93712f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0a93712f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0a93712f

Branch: refs/heads/HDFS-7285
Commit: 0a93712f3b9b36d746577dca5da0f7f09756fcca
Parents: 6ff957b
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Jul 14 13:10:51 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Jul 14 13:10:51 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |  3 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  2 +-
 .../java/org/apache/hadoop/hdfs/DFSPacket.java  |  6 +-
 .../hadoop/hdfs/DFSStripedOutputStream.java     | 85 ++++++++------------
 4 files changed, 41 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a93712f/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 2b91295..93c3162 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -350,3 +350,6 @@
 
     HDFS-8702. Erasure coding: update BlockManager.blockHasEnoughRacks(..) logic
     for striped block. (Kai Sasaki via jing9)
+
+    HDFS-8734. Erasure Coding: fix one cell need two packets. (Walter Su via
+    jing9)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a93712f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index f41044b..9e201ad 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -419,7 +419,7 @@ public class DFSOutputStream extends FSOutputSummer
 
     currentPacket.writeChecksum(checksum, ckoff, cklen);
     currentPacket.writeData(b, offset, len);
-    currentPacket.incNumChunks(1);
+    currentPacket.incNumChunks();
     streamer.incBytesCurBlock(len);
 
     // If packet is full, enqueue it for transmission

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a93712f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
index 2698de3..a26e35e 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@ -259,10 +259,10 @@ public class DFSPacket {
   }
 
   /**
-   * increase the number of chunks by n
+   * increase the number of chunks by one
    */
-  synchronized void incNumChunks(int n) {
-    numChunks += n;
+  synchronized void incNumChunks() {
+    numChunks++;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a93712f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index e6de714..a4bb49d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -254,6 +254,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
   private final CellBuffers cellBuffers;
   private final RawErasureEncoder encoder;
   private final List<StripedDataStreamer> streamers;
+  private final DFSPacket[] currentPackets; // current Packet of each streamer
 
   /** Size of each striping cell, must be a multiple of bytesPerChecksum */
   private final int cellSize;
@@ -301,6 +302,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       s.add(streamer);
     }
     streamers = Collections.unmodifiableList(s);
+    currentPackets = new DFSPacket[streamers.size()];
     setCurrentStreamer(0);
   }
 
@@ -316,9 +318,18 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     return (StripedDataStreamer)streamer;
   }
 
-  private synchronized StripedDataStreamer setCurrentStreamer(int i) {
-    streamer = streamers.get(i);
+  private synchronized StripedDataStreamer setCurrentStreamer(int newIdx)
+      throws IOException {
+    // backup currentPacket for current streamer
+    int oldIdx = streamers.indexOf(streamer);
+    if (oldIdx >= 0) {
+      currentPackets[oldIdx] = currentPacket;
+    }
+
+    streamer = streamers.get(newIdx);
+    currentPacket = currentPackets[newIdx];
     adjustChunkBoundary();
+
     return getCurrentStreamer();
   }
 
@@ -366,41 +377,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     currentPacket = null;
   }
 
-  /**
-   * Generate packets from a given buffer. This is only used for streamers
-   * writing parity blocks.
-   *
-   * @param byteBuffer the given buffer to generate packets
-   * @param checksumBuf the checksum buffer
-   * @return packets generated
-   * @throws IOException
-   */
-  private List<DFSPacket> generatePackets(
-      ByteBuffer byteBuffer, byte[] checksumBuf) throws IOException{
-    List<DFSPacket> packets = new ArrayList<>();
-    assert byteBuffer.hasArray();
-    getDataChecksum().calculateChunkedSums(byteBuffer.array(), 0,
-        byteBuffer.remaining(), checksumBuf, 0);
-    int ckOff = 0;
-    while (byteBuffer.remaining() > 0) {
-      DFSPacket p = createPacket(packetSize, chunksPerPacket,
-          getCurrentStreamer().getBytesCurBlock(),
-          getCurrentStreamer().getAndIncCurrentSeqno(), false);
-      int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum;
-      int toWrite = byteBuffer.remaining() > maxBytesToPacket ?
-          maxBytesToPacket: byteBuffer.remaining();
-      int chunks = (toWrite - 1) / bytesPerChecksum + 1;
-      int ckLen = chunks * getChecksumSize();
-      p.writeChecksum(checksumBuf, ckOff, ckLen);
-      ckOff += ckLen;
-      p.writeData(byteBuffer, toWrite);
-      getCurrentStreamer().incBytesCurBlock(toWrite);
-      p.incNumChunks(chunks);
-      packets.add(p);
-    }
-    return packets;
-  }
-
   @Override
   protected synchronized void writeChunk(byte[] bytes, int offset, int len,
       byte[] checksum, int ckoff, int cklen) throws IOException {
@@ -413,11 +389,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     if (!current.isFailed()) {
       try {
         super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
-
-        // cell is full and current packet has not been enqueued,
-        if (cellFull && currentPacket != null) {
-          enqueueCurrentPacketFull();
-        }
       } catch(Exception e) {
         handleStreamerFailure("offset=" + offset + ", length=" + len, e);
       }
@@ -581,10 +552,14 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     final long oldBytes = current.getBytesCurBlock();
     if (!current.isFailed()) {
       try {
-        for (DFSPacket p : generatePackets(buffer, checksumBuf)) {
-          getCurrentStreamer().waitAndQueuePacket(p);
+        DataChecksum sum = getDataChecksum();
+        sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
+        for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
+          int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
+          int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
+          super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset,
+              getChecksumSize());
         }
-        endBlock();
       } catch(Exception e) {
         handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
       }
@@ -628,16 +603,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       // flush from all upper layers
       try {
         flushBuffer();
-        if (currentPacket != null) {
-          enqueueCurrentPacket();
-        }
+        // if the last stripe is incomplete, generate and write parity cells
+        writeParityCellsForLastStripe();
+        enqueueAllCurrentPackets();
       } catch(Exception e) {
         handleStreamerFailure("closeImpl", e);
       }
 
-      // if the last stripe is incomplete, generate and write parity cells
-      writeParityCellsForLastStripe();
-
       for (int i = 0; i < numAllBlocks; i++) {
         final StripedDataStreamer s = setCurrentStreamer(i);
         if (!s.isFailed()) {
@@ -667,4 +639,15 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       setClosed();
     }
   }
+
+  private void enqueueAllCurrentPackets() throws IOException {
+    int idx = streamers.indexOf(getCurrentStreamer());
+    for(int i = 0; i < streamers.size(); i++) {
+      setCurrentStreamer(i);
+      if (currentPacket != null) {
+        enqueueCurrentPacket();
+      }
+    }
+    setCurrentStreamer(idx);
+  }
 }