You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/09/20 04:58:19 UTC

[15/18] hadoop git commit: HDFS-12437. Fix test setup in TestLeaseRecoveryStriped.

HDFS-12437. Fix test setup in TestLeaseRecoveryStriped.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/12d9d7bc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/12d9d7bc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/12d9d7bc

Branch: refs/heads/HDFS-7240
Commit: 12d9d7bc509bca82b8f40301e3dc5ca764be45eb
Parents: 51edaac
Author: Andrew Wang <wa...@apache.org>
Authored: Tue Sep 19 16:42:20 2017 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Tue Sep 19 16:42:20 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/TestLeaseRecoveryStriped.java   | 156 ++++++++++++++-----
 1 file changed, 113 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/12d9d7bc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java
index 2846dbf..36ac8b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java
@@ -19,8 +19,7 @@ package org.apache.hadoop.hdfs;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -28,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.IOUtils;
@@ -40,34 +40,41 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeoutException;
 
 public class TestLeaseRecoveryStriped {
-  public static final Log LOG = LogFactory
-      .getLog(TestLeaseRecoveryStriped.class);
+  public static final Logger LOG = LoggerFactory
+      .getLogger(TestLeaseRecoveryStriped.class);
 
   private final ErasureCodingPolicy ecPolicy =
       StripedFileTestUtil.getDefaultECPolicy();
   private final int dataBlocks = ecPolicy.getNumDataUnits();
   private final int parityBlocks = ecPolicy.getNumParityUnits();
   private final int cellSize = ecPolicy.getCellSize();
-  private final int stripSize = dataBlocks * cellSize;
-  private final int stripesPerBlock = 15;
+  private final int stripeSize = dataBlocks * cellSize;
+  private final int stripesPerBlock = 4;
   private final int blockSize = cellSize * stripesPerBlock;
   private final int blockGroupSize = blockSize * dataBlocks;
   private static final int bytesPerChecksum = 512;
 
   static {
     GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(DFSStripedOutputStream.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(BlockRecoveryWorker.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.DEBUG);
   }
 
   static private final String fakeUsername = "fakeUser1";
@@ -83,7 +90,7 @@ public class TestLeaseRecoveryStriped {
   public void setup() throws IOException {
     conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
-    conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
+    conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 60000L);
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
         false);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
@@ -104,78 +111,118 @@ public class TestLeaseRecoveryStriped {
     }
   }
 
-  private int[][][] getBlockLengthsSuite() {
+  private static class BlockLengths {
+    private final int[] blockLengths;
+    private final long safeLength;
+
+    BlockLengths(ErasureCodingPolicy policy, int[] blockLengths) {
+      this.blockLengths = blockLengths;
+      long[] longArray = Arrays.stream(blockLengths).asLongStream().toArray();
+      this.safeLength = StripedBlockUtil.getSafeLength(policy, longArray);
+    }
+
+    @Override
+    public String toString() {
+      return new ToStringBuilder(this)
+          .append("blockLengths", getBlockLengths())
+          .append("safeLength", getSafeLength())
+          .toString();
+    }
+
+    /**
+     * Length of each block in a block group.
+     */
+    public int[] getBlockLengths() {
+      return blockLengths;
+    }
+
+    /**
+     * Safe length, calculated by the block lengths.
+     */
+    public long getSafeLength() {
+      return safeLength;
+    }
+  }
+
+  private BlockLengths[] getBlockLengthsSuite() {
     final int groups = 4;
-    final int minNumCell = 3;
-    final int maxNumCell = 11;
+    final int minNumCell = 1;
+    final int maxNumCell = stripesPerBlock;
     final int minNumDelta = -4;
     final int maxNumDelta = 2;
-    int delta = 0;
-    int[][][] blkLenSuite = new int[groups][][];
+    BlockLengths[] suite = new BlockLengths[groups];
     Random random = ThreadLocalRandom.current();
-    for (int i = 0; i < blkLenSuite.length; i++) {
-      if (i == blkLenSuite.length - 1) {
-        delta = bytesPerChecksum;
-      }
-      int[][] suite = new int[2][];
-      int[] lens = new int[dataBlocks + parityBlocks];
-      long[] lenInLong = new long[lens.length];
-      for (int j = 0; j < lens.length; j++) {
+    for (int i = 0; i < groups; i++) {
+      int[] blockLengths = new int[dataBlocks + parityBlocks];
+      for (int j = 0; j < blockLengths.length; j++) {
+        // Choose a random number of cells for the block
         int numCell = random.nextInt(maxNumCell - minNumCell + 1) + minNumCell;
-        int numDelta = j < dataBlocks ?
-            random.nextInt(maxNumDelta - minNumDelta + 1) + minNumDelta : 0;
-        lens[j] = cellSize * numCell + delta * numDelta;
-        lenInLong[j] = lens[j];
+        // For data blocks, jitter the length a bit
+        int numDelta = 0;
+        if (i == groups - 1 && j < dataBlocks) {
+          numDelta = random.nextInt(maxNumDelta - minNumDelta + 1) +
+              minNumDelta;
+        }
+        blockLengths[j] = (cellSize * numCell) + (bytesPerChecksum * numDelta);
       }
-      suite[0] = lens;
-      suite[1] = new int[]{
-          (int) StripedBlockUtil.getSafeLength(ecPolicy, lenInLong)};
-      blkLenSuite[i] = suite;
+      suite[i] = new BlockLengths(ecPolicy, blockLengths);
     }
-    return blkLenSuite;
+    return suite;
   }
 
-  private final int[][][] blockLengthsSuite = getBlockLengthsSuite();
+  private final BlockLengths[] blockLengthsSuite = getBlockLengthsSuite();
 
   @Test
   public void testLeaseRecovery() throws Exception {
+    LOG.info("blockLengthsSuite: " +
+        Arrays.toString(blockLengthsSuite));
     for (int i = 0; i < blockLengthsSuite.length; i++) {
-      int[] blockLengths = blockLengthsSuite[i][0];
-      int safeLength = blockLengthsSuite[i][1][0];
+      BlockLengths blockLengths = blockLengthsSuite[i];
       try {
-        runTest(blockLengths, safeLength);
+        runTest(blockLengths.getBlockLengths(), blockLengths.getSafeLength());
       } catch (Throwable e) {
         String msg = "failed testCase at i=" + i + ", blockLengths="
-            + Arrays.toString(blockLengths) + "\n"
+            + blockLengths + "\n"
             + StringUtils.stringifyException(e);
         Assert.fail(msg);
       }
     }
   }
 
-  private void runTest(int[] blockLengths, int safeLength) throws Exception {
+  private void runTest(int[] blockLengths, long safeLength) throws Exception {
     writePartialBlocks(blockLengths);
     recoverLease();
 
     List<Long> oldGS = new ArrayList<>();
     oldGS.add(1001L);
-    StripedFileTestUtil.checkData(dfs, p, safeLength,
+    StripedFileTestUtil.checkData(dfs, p, (int)safeLength,
         new ArrayList<DatanodeInfo>(), oldGS, blockGroupSize);
     // After recovery, storages are reported by primary DN. we should verify
     // storages reported by blockReport.
     cluster.restartNameNode(true);
     cluster.waitFirstBRCompleted(0, 10000);
-    StripedFileTestUtil.checkData(dfs, p, safeLength,
+    StripedFileTestUtil.checkData(dfs, p, (int)safeLength,
         new ArrayList<DatanodeInfo>(), oldGS, blockGroupSize);
   }
 
+  /**
+   * Write a file with blocks of different lengths.
+   *
+   * This method depends on completing before the DFS socket timeout.
+   * Otherwise, the client will mark timed-out streamers as failed, and the
+   * write will fail if there are too many failed streamers.
+   *
+   * @param blockLengths lengths of blocks to write
+   * @throws Exception
+   */
   private void writePartialBlocks(int[] blockLengths) throws Exception {
     final FSDataOutputStream out = dfs.create(p);
     final DFSStripedOutputStream stripedOut = (DFSStripedOutputStream) out
         .getWrappedStream();
-    int length = (stripesPerBlock - 1) * stripSize;
+    int length = (stripesPerBlock - 1) * stripeSize;
     int[] posToKill = getPosToKill(blockLengths);
     int checkingPos = nextCheckingPos(posToKill, 0);
+    Set<Integer> stoppedStreamerIndexes = new HashSet<>();
     try {
       for (int pos = 0; pos < length; pos++) {
         out.write(StripedFileTestUtil.getByte(pos));
@@ -183,15 +230,31 @@ public class TestLeaseRecoveryStriped {
           for (int index : getIndexToStop(posToKill, pos)) {
             out.flush();
             stripedOut.enqueueAllCurrentPackets();
+            LOG.info("Stopping block stream idx {} at file offset {} block " +
+                    "length {}", index, pos, blockLengths[index]);
             StripedDataStreamer s = stripedOut.getStripedDataStreamer(index);
             waitStreamerAllAcked(s);
             waitByteSent(s, blockLengths[index]);
             stopBlockStream(s);
+            stoppedStreamerIndexes.add(index);
           }
           checkingPos = nextCheckingPos(posToKill, pos);
         }
       }
     } finally {
+      // Flush everything
+      out.flush();
+      stripedOut.enqueueAllCurrentPackets();
+      // Wait for streamers that weren't killed above to be written out
+      for (int i=0; i< blockLengths.length; i++) {
+        if (stoppedStreamerIndexes.contains(i)) {
+          continue;
+        }
+        StripedDataStreamer s = stripedOut.getStripedDataStreamer(i);
+        LOG.info("Waiting for block stream idx {} to reach length {}", i,
+            blockLengths[i]);
+        waitStreamerAllAcked(s);
+      }
       DFSTestUtil.abortStream(stripedOut);
     }
   }
@@ -210,7 +273,7 @@ public class TestLeaseRecoveryStriped {
     int[] posToKill = new int[dataBlocks + parityBlocks];
     for (int i = 0; i < dataBlocks; i++) {
       int numStripe = (blockLengths[i] - 1) / cellSize;
-      posToKill[i] = numStripe * stripSize + i * cellSize
+      posToKill[i] = numStripe * stripeSize + i * cellSize
           + blockLengths[i] % cellSize;
       if (blockLengths[i] % cellSize == 0) {
         posToKill[i] += cellSize;
@@ -220,7 +283,7 @@ public class TestLeaseRecoveryStriped {
         + parityBlocks; i++) {
       Preconditions.checkArgument(blockLengths[i] % cellSize == 0);
       int numStripe = (blockLengths[i]) / cellSize;
-      posToKill[i] = numStripe * stripSize;
+      posToKill[i] = numStripe * stripeSize;
     }
     return posToKill;
   }
@@ -243,13 +306,20 @@ public class TestLeaseRecoveryStriped {
         public Boolean get() {
           return s.bytesSent >= byteSent;
         }
-      }, 100, 3000);
+      }, 100, 30000);
     } catch (TimeoutException e) {
       throw new IOException("Timeout waiting for streamer " + s + ". Sent="
           + s.bytesSent + ", expected=" + byteSent);
     }
   }
 
+  /**
+   * Stop the block stream without immediately inducing a hard failure.
+   * Packets can continue to be queued until the streamer hits a socket timeout.
+   *
+   * @param s
+   * @throws Exception
+   */
   private void stopBlockStream(StripedDataStreamer s) throws Exception {
     IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream();
     Whitebox.setInternalState(s, "blockStream",
@@ -257,8 +327,8 @@ public class TestLeaseRecoveryStriped {
   }
 
   private void recoverLease() throws Exception {
-    final DistributedFileSystem dfs2 = (DistributedFileSystem) getFSAsAnotherUser(
-        conf);
+    final DistributedFileSystem dfs2 =
+        (DistributedFileSystem) getFSAsAnotherUser(conf);
     try {
       GenericTestUtils.waitFor(new Supplier<Boolean>() {
         @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org