You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/09/06 20:45:32 UTC

[07/17] hadoop git commit: HDFS-11882. Precisely calculate acked length of striped block groups in updatePipeline.

HDFS-11882. Precisely calculate acked length of striped block groups in updatePipeline.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/86abf484
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/86abf484
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/86abf484

Branch: refs/heads/HDFS-7240
Commit: 86abf484546b63afd6640fc7a9fc9c3ddb56e2bd
Parents: df93186
Author: Andrew Wang <wa...@apache.org>
Authored: Tue Sep 5 14:16:03 2017 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Wed Sep 6 12:14:48 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/DFSStripedOutputStream.java     | 160 +++++++++++++++++--
 .../TestDFSStripedOutputStreamWithFailure.java  |  77 ++++++++-
 2 files changed, 222 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/86abf484/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index d5206d1..408b325 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -641,7 +641,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
       // wait till all the healthy streamers to
       // 1) get the updated block info
       // 2) create new block outputstream
-      newFailed = waitCreatingNewStreams(healthySet);
+      newFailed = waitCreatingStreamers(healthySet);
       if (newFailed.size() + failedStreamers.size() >
           numAllBlocks - numDataBlocks) {
         throw new IOException(
@@ -668,6 +668,14 @@ public class DFSStripedOutputStream extends DFSOutputStream
     }
   }
 
+  /**
+   * Check if the streamers were successfully updated, adding failed streamers
+   * in the <i>failed</i> return parameter.
+   * @param failed Return parameter containing failed streamers from
+   *               <i>streamers</i>.
+   * @param streamers Set of streamers that are being updated
+   * @return total number of successful updates and failures
+   */
   private int checkStreamerUpdates(Set<StripedDataStreamer> failed,
       Set<StripedDataStreamer> streamers) {
     for (StripedDataStreamer streamer : streamers) {
@@ -682,7 +690,15 @@ public class DFSStripedOutputStream extends DFSOutputStream
     return coordinator.updateStreamerMap.size() + failed.size();
   }
 
-  private Set<StripedDataStreamer> waitCreatingNewStreams(
+  /**
+   * Waits for streamers to be created.
+   *
+   * @param healthyStreamers Set of healthy streamers
+   * @return Set of streamers that failed.
+   *
+   * @throws IOException
+   */
+  private Set<StripedDataStreamer> waitCreatingStreamers(
       Set<StripedDataStreamer> healthyStreamers) throws IOException {
     Set<StripedDataStreamer> failed = new HashSet<>();
     final int expectedNum = healthyStreamers.size();
@@ -773,9 +789,10 @@ public class DFSStripedOutputStream extends DFSOutputStream
       }
     }
 
-    // should update the block group length based on the acked length
+    // Update the NameNode with the acked length of the block group
+    // Save and restore the unacked length
     final long sentBytes = currentBlockGroup.getNumBytes();
-    final long ackedBytes = getNumAckedStripes() * cellSize * numDataBlocks;
+    final long ackedBytes = getAckedLength();
     Preconditions.checkState(ackedBytes <= sentBytes,
         "Acked:" + ackedBytes + ", Sent:" + sentBytes);
     currentBlockGroup.setNumBytes(ackedBytes);
@@ -787,23 +804,140 @@ public class DFSStripedOutputStream extends DFSOutputStream
   }
 
   /**
-   * Get the number of acked stripes. An acked stripe means at least data block
-   * number size cells of the stripe were acked.
+   * Return the length of each block in the block group.
+   * Unhealthy blocks have a length of -1.
+   *
+   * @return List of block lengths.
    */
-  private long getNumAckedStripes() {
-    int minStripeNum = Integer.MAX_VALUE;
+  private List<Long> getBlockLengths() {
+    List<Long> blockLengths = new ArrayList<>(numAllBlocks);
     for (int i = 0; i < numAllBlocks; i++) {
       final StripedDataStreamer streamer = getStripedDataStreamer(i);
+      long numBytes = -1;
       if (streamer.isHealthy()) {
-        int curStripeNum = 0;
         if (streamer.getBlock() != null) {
-          curStripeNum = (int) (streamer.getBlock().getNumBytes() / cellSize);
+          numBytes = streamer.getBlock().getNumBytes();
         }
-        minStripeNum = Math.min(curStripeNum, minStripeNum);
+      }
+      blockLengths.add(numBytes);
+    }
+    return blockLengths;
+  }
+
+  /**
+   * Get the length of acked bytes in the block group.
+   *
+   * <p>
+   *   A full stripe is acked when at least numDataBlocks streamers have
+   *   the corresponding cells of the stripe, and all previous full stripes are
+   *   also acked. This enforces the constraint that there is at most one
+   *   partial stripe.
+   * </p>
+   * <p>
+   *   Partial stripes write all parity cells. Empty data cells are not written.
+   *   Parity cells are the length of the longest data cell(s). For example,
+   *   with RS(3,2), if we have data cells with lengths [1MB, 64KB, 0], the
+   *   parity blocks will be length [1MB, 1MB].
+   * </p>
+   * <p>
+   *   To be considered acked, a partial stripe needs at least numDataBlocks
+   *   empty or written cells.
+   * </p>
+   * <p>
+   *   Currently, partial stripes can only happen when closing the file at a
+   *   non-stripe boundary, but this could also happen during (currently
+   *   unimplemented) hflush/hsync support.
+   * </p>
+   */
+  private long getAckedLength() {
+    // Determine the number of full stripes that are sufficiently durable
+    final long sentBytes = currentBlockGroup.getNumBytes();
+    final long numFullStripes = sentBytes / numDataBlocks / cellSize;
+    final long fullStripeLength = numFullStripes * numDataBlocks * cellSize;
+    assert fullStripeLength <= sentBytes : "Full stripe length can't be " +
+        "greater than the block group length";
+
+    long ackedLength = 0;
+
+    // Determine the length contained by at least `numDataBlocks` blocks.
+    // Since it's sorted, all the blocks after `offset` are at least as long,
+    // and there are at least `numDataBlocks` at or after `offset`.
+    List<Long> blockLengths = Collections.unmodifiableList(getBlockLengths());
+    List<Long> sortedBlockLengths = new ArrayList<>(blockLengths);
+    Collections.sort(sortedBlockLengths);
+    if (numFullStripes > 0) {
+      final int offset = sortedBlockLengths.size() - numDataBlocks;
+      ackedLength = sortedBlockLengths.get(offset) * numDataBlocks;
+    }
+
+    // If the acked length is less than the expected full stripe length, then
+    // we're missing a full stripe. Return the acked length.
+    if (ackedLength < fullStripeLength) {
+      return ackedLength;
+    }
+    // If the expected length is exactly a stripe boundary, then we're also done
+    if (ackedLength == sentBytes) {
+      return ackedLength;
+    }
+
+    /*
+    Otherwise, we're potentially dealing with a partial stripe.
+    The partial stripe is laid out as follows:
+
+      0 or more full data cells, `cellSize` in length.
+      0 or 1 partial data cells.
+      0 or more empty data cells.
+      `numParityBlocks` parity cells, the length of the longest data cell.
+
+    If the partial stripe is sufficiently acked, we'll update the ackedLength.
+    */
+
+    // How many full and empty data cells do we expect?
+    final int numFullDataCells = (int)
+        ((sentBytes - fullStripeLength) / cellSize);
+    final int partialLength = (int) (sentBytes - fullStripeLength) % cellSize;
+    final int numPartialDataCells = partialLength == 0 ? 0 : 1;
+    final int numEmptyDataCells = numDataBlocks - numFullDataCells -
+        numPartialDataCells;
+    // Calculate the expected length of the parity blocks.
+    final int parityLength = numFullDataCells > 0 ? cellSize : partialLength;
+
+    final long fullStripeBlockOffset = fullStripeLength / numDataBlocks;
+
+    // Iterate through each type of streamers, checking the expected length.
+    long[] expectedBlockLengths = new long[numAllBlocks];
+    int idx = 0;
+    // Full cells
+    for (; idx < numFullDataCells; idx++) {
+      expectedBlockLengths[idx] = fullStripeBlockOffset + cellSize;
+    }
+    // Partial cell
+    for (; idx < numFullDataCells + numPartialDataCells; idx++) {
+      expectedBlockLengths[idx] = fullStripeBlockOffset + partialLength;
+    }
+    // Empty cells
+    for (; idx < numFullDataCells + numPartialDataCells + numEmptyDataCells;
+         idx++) {
+      expectedBlockLengths[idx] = fullStripeBlockOffset;
+    }
+    // Parity cells
+    for (; idx < numAllBlocks; idx++) {
+      expectedBlockLengths[idx] = fullStripeBlockOffset + parityLength;
+    }
+
+    // Check expected lengths against actual streamer lengths.
+    // Update if we have sufficient durability.
+    int numBlocksWithCorrectLength = 0;
+    for (int i = 0; i < numAllBlocks; i++) {
+      if (blockLengths.get(i) == expectedBlockLengths[i]) {
+        numBlocksWithCorrectLength++;
       }
     }
-    assert minStripeNum != Integer.MAX_VALUE;
-    return minStripeNum;
+    if (numBlocksWithCorrectLength >= numDataBlocks) {
+      ackedLength = sentBytes;
+    }
+
+    return ackedLength;
   }
 
   private int stripeDataSize() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86abf484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
index 9915a2f..f63a353 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
 import org.junit.Assert;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -61,6 +60,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
 
 /**
  * Test striped file write operation with data node failures.
@@ -391,6 +391,79 @@ public class TestDFSStripedOutputStreamWithFailure {
   }
 
   /**
+   * When the two DataNodes with partial data blocks fail.
+   */
+  @Test
+  public void runTestWithDifferentLengths() throws Exception {
+    assumeTrue("Skip this test case in the subclasses. Once is enough.",
+        this.getClass().equals(TestDFSStripedOutputStreamWithFailure.class));
+
+    final HdfsConfiguration conf = newHdfsConfiguration();
+
+    final int[] fileLengths = {
+        // Full stripe then partial on cell boundary
+        cellSize * (dataBlocks * 2 - 2),
+        // Full stripe and a partial on non-cell boundary
+        (cellSize * dataBlocks) + 123,
+    };
+    try {
+      for (int length: fileLengths) {
+        // select the two DNs with partial block to kill
+        final int[] dnIndex = {dataBlocks - 2, dataBlocks - 1};
+        final int[] killPos = getKillPositions(length, dnIndex.length);
+        try {
+          LOG.info("runTestWithMultipleFailure2: length==" + length
+              + ", killPos=" + Arrays.toString(killPos)
+              + ", dnIndex=" + Arrays.toString(dnIndex));
+          setup(conf);
+          runTest(length, killPos, dnIndex, false);
+        } catch (Throwable e) {
+          final String err = "failed, killPos=" + Arrays.toString(killPos)
+              + ", dnIndex=" + Arrays.toString(dnIndex) + ", length=" + length;
+          LOG.error(err);
+          throw e;
+        }
+      }
+    } finally {
+      tearDown();
+    }
+  }
+
+  /**
+   * Test writing very short EC files with many failures.
+   */
+  @Test
+  public void runTestWithShortStripe() throws Exception {
+    assumeTrue("Skip this test case in the subclasses. Once is enough.",
+        this.getClass().equals(TestDFSStripedOutputStreamWithFailure.class));
+
+    final HdfsConfiguration conf = newHdfsConfiguration();
+    // Write a file with a 1 cell partial stripe
+    final int length = cellSize - 123;
+    // Kill all but one DN
+    final int[] dnIndex = new int[dataBlocks + parityBlocks - 1];
+    for (int i = 0; i < dnIndex.length; i++) {
+      dnIndex[i] = i;
+    }
+    final int[] killPos = getKillPositions(length, dnIndex.length);
+
+    try {
+      LOG.info("runTestWithShortStripe: length==" + length + ", killPos="
+          + Arrays.toString(killPos) + ", dnIndex="
+          + Arrays.toString(dnIndex));
+      setup(conf);
+      runTest(length, killPos, dnIndex, false);
+    } catch (Throwable e) {
+      final String err = "failed, killPos=" + Arrays.toString(killPos)
+          + ", dnIndex=" + Arrays.toString(dnIndex) + ", length=" + length;
+      LOG.error(err);
+      throw e;
+    } finally {
+      tearDown();
+    }
+  }
+
+  /**
    * runTest implementation.
    * @param length file length
    * @param killPos killing positions in ascending order
@@ -558,7 +631,7 @@ public class TestDFSStripedOutputStreamWithFailure {
 
   private void run(int offset) {
     int base = getBase();
-    Assume.assumeTrue(base >= 0);
+    assumeTrue(base >= 0);
     final int i = offset + base;
     final Integer length = getLength(i);
     if (length == null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org