You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2017/12/07 08:53:16 UTC

[1/2] hadoop git commit: HDFS-11576. Block recovery will fail indefinitely if recovery time > heartbeat interval. Contributed by Lukas Majercak

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 046424cf8 -> afc606c3a
  refs/heads/branch-2.9 89314b78d -> ae3dc443f


HDFS-11576. Block recovery will fail indefinitely if recovery time > heartbeat interval. Contributed by Lukas Majercak


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/afc606c3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/afc606c3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/afc606c3

Branch: refs/heads/branch-2
Commit: afc606c3aa9fc886ff2c17ec9a6a83e30c1adad0
Parents: 046424c
Author: Chris Douglas <cd...@apache.org>
Authored: Thu Dec 7 00:43:21 2017 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Thu Dec 7 00:43:21 2017 -0800

----------------------------------------------------------------------
 .../apache/hadoop/test/GenericTestUtils.java    |  10 +-
 .../server/blockmanagement/BlockManager.java    |  41 ++++++
 .../blockmanagement/PendingRecoveryBlocks.java  | 143 +++++++++++++++++++
 .../hdfs/server/namenode/FSNamesystem.java      |  40 +++---
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java  |  10 ++
 .../TestPendingRecoveryBlocks.java              |  87 +++++++++++
 .../hdfs/server/datanode/TestBlockRecovery.java | 108 +++++++++++++-
 .../namenode/ha/TestPipelinesFailover.java      |   4 +-
 8 files changed, 422 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/afc606c3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
index 9291bb0..af47d29 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
@@ -625,10 +625,16 @@ public abstract class GenericTestUtils {
    * conditions.
    */
   public static class SleepAnswer implements Answer<Object> {
+    private final int minSleepTime;
     private final int maxSleepTime;
     private static Random r = new Random();
-    
+
     public SleepAnswer(int maxSleepTime) {
+      this(0, maxSleepTime);
+    }
+
+    public SleepAnswer(int minSleepTime, int maxSleepTime) {
+      this.minSleepTime = minSleepTime;
       this.maxSleepTime = maxSleepTime;
     }
     
@@ -636,7 +642,7 @@ public abstract class GenericTestUtils {
     public Object answer(InvocationOnMock invocation) throws Throwable {
       boolean interrupted = false;
       try {
-        Thread.sleep(r.nextInt(maxSleepTime));
+        Thread.sleep(r.nextInt(maxSleepTime) + minSleepTime);
       } catch (InterruptedException ie) {
         interrupted = true;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afc606c3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index bdb926c..4a8c8c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -43,6 +43,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.management.ObjectName;
@@ -149,6 +150,8 @@ public class BlockManager implements BlockStatsMXBean {
   private static final String QUEUE_REASON_FUTURE_GENSTAMP =
     "generation stamp is in the future";
 
+  private static final long BLOCK_RECOVERY_TIMEOUT_MULTIPLIER = 30;
+
   private final Namesystem namesystem;
 
   private final BlockManagerSafeMode bmSafeMode;
@@ -267,6 +270,9 @@ public class BlockManager implements BlockStatsMXBean {
   @VisibleForTesting
   final PendingReplicationBlocks pendingReplications;
 
+  /** Stores information about block recovery attempts. */
+  private final PendingRecoveryBlocks pendingRecoveryBlocks;
+
   /** The maximum number of replicas allowed for a block */
   public final short maxReplication;
   /**
@@ -446,6 +452,12 @@ public class BlockManager implements BlockStatsMXBean {
     }
     this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
 
+    long heartbeatIntervalSecs = conf.getTimeDuration(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
+    long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs);
+    pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout);
+
     this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
 
     bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
@@ -4114,6 +4126,25 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
+  /**
+   * Notification of a successful block recovery.
+   * @param block for which the recovery succeeded
+   */
+  public void successfulBlockRecovery(BlockInfo block) {
+    pendingRecoveryBlocks.remove(block);
+  }
+
+  /**
+   * Checks whether a recovery attempt has been made for the given block.
+   * If so, checks whether that attempt has timed out.
+   * @param b block for which recovery is being attempted
+   * @return true if no recovery attempt has been made or
+   *         the previous attempt timed out
+   */
+  public boolean addBlockRecoveryAttempt(BlockInfo b) {
+    return pendingRecoveryBlocks.add(b);
+  }
+
   @VisibleForTesting
   public void flushBlockOps() throws IOException {
     runBlockOp(new Callable<Void>(){
@@ -4222,4 +4253,14 @@ public class BlockManager implements BlockStatsMXBean {
   boolean isReplicaCorrupt(BlockInfo blk, DatanodeDescriptor d) {
     return corruptReplicas.isReplicaCorrupt(blk, d);
   }
+
+  private static long getBlockRecoveryTimeout(long heartbeatIntervalSecs) {
+    return TimeUnit.SECONDS.toMillis(heartbeatIntervalSecs *
+        BLOCK_RECOVERY_TIMEOUT_MULTIPLIER);
+  }
+
+  @VisibleForTesting
+  public void setBlockRecoveryTimeout(long blockRecoveryTimeout) {
+    pendingRecoveryBlocks.setRecoveryTimeoutInterval(blockRecoveryTimeout);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afc606c3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
new file mode 100644
index 0000000..3f5f27c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * PendingRecoveryBlocks tracks recovery attempts for each block and their
+ * timeouts to ensure we do not have multiple recoveries at the same time
+ * and retry only after the timeout for a recovery has expired.
+ */
+class PendingRecoveryBlocks {
+  private static final Logger LOG = BlockManager.LOG;
+
+  /** List of recovery attempts per block and the time they expire. */
+  private final LightWeightHashSet<BlockRecoveryAttempt> recoveryTimeouts =
+      new LightWeightHashSet<>();
+
+  /** The timeout for issuing a block recovery again.
+   * (it should be larger than the time to recover a block)
+   */
+  private long recoveryTimeoutInterval;
+
+  PendingRecoveryBlocks(long timeout) {
+    this.recoveryTimeoutInterval = timeout;
+  }
+
+  /**
+   * Remove recovery attempt for the given block.
+   * @param block whose recovery attempt to remove.
+   */
+  synchronized void remove(BlockInfo block) {
+    recoveryTimeouts.remove(new BlockRecoveryAttempt(block));
+  }
+
+  /**
+   * Checks whether a recovery attempt has been made for the given block.
+   * If so, checks whether that attempt has timed out.
+   * @param block block for which recovery is being attempted
+   * @return true if no recovery attempt has been made or
+   *         the previous attempt timed out
+   */
+  synchronized boolean add(BlockInfo block) {
+    boolean added = false;
+    long curTime = getTime();
+    BlockRecoveryAttempt recoveryAttempt =
+        recoveryTimeouts.getElement(new BlockRecoveryAttempt(block));
+
+    if (recoveryAttempt == null) {
+      BlockRecoveryAttempt newAttempt = new BlockRecoveryAttempt(
+          block, curTime + recoveryTimeoutInterval);
+      added = recoveryTimeouts.add(newAttempt);
+    } else if (recoveryAttempt.hasTimedOut(curTime)) {
+      // Previous attempt timed out, reset the timeout
+      recoveryAttempt.setTimeout(curTime + recoveryTimeoutInterval);
+      added = true;
+    } else {
+      long timeoutIn = TimeUnit.MILLISECONDS.toSeconds(
+          recoveryAttempt.timeoutAt - curTime);
+      LOG.info("Block recovery attempt for " + block + " rejected, as the " +
+          "previous attempt times out in " + timeoutIn + " seconds.");
+    }
+    return added;
+  }
+
+  /**
+   * Check whether the given block is under recovery.
+   * @param b block for which to check
+   * @return true if the given block is being recovered
+   */
+  synchronized boolean isUnderRecovery(BlockInfo b) {
+    BlockRecoveryAttempt recoveryAttempt =
+        recoveryTimeouts.getElement(new BlockRecoveryAttempt(b));
+    return recoveryAttempt != null;
+  }
+
+  long getTime() {
+    return Time.monotonicNow();
+  }
+
+  @VisibleForTesting
+  synchronized void setRecoveryTimeoutInterval(long recoveryTimeoutInterval) {
+    this.recoveryTimeoutInterval = recoveryTimeoutInterval;
+  }
+
+  /**
+   * Tracks timeout for block recovery attempt of a given block.
+   */
+  private static class BlockRecoveryAttempt {
+    private final BlockInfo blockInfo;
+    private long timeoutAt;
+
+    private BlockRecoveryAttempt(BlockInfo blockInfo) {
+      this(blockInfo, 0);
+    }
+
+    BlockRecoveryAttempt(BlockInfo blockInfo, long timeoutAt) {
+      this.blockInfo = blockInfo;
+      this.timeoutAt = timeoutAt;
+    }
+
+    boolean hasTimedOut(long currentTime) {
+      return currentTime > timeoutAt;
+    }
+
+    void setTimeout(long newTimeoutAt) {
+      this.timeoutAt = newTimeoutAt;
+    }
+
+    @Override
+    public int hashCode() {
+      return blockInfo.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof BlockRecoveryAttempt) {
+        return this.blockInfo.equals(((BlockRecoveryAttempt) obj).blockInfo);
+      }
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afc606c3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index e21da7f..50e9420 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3240,25 +3240,30 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             + "Removed empty last block and closed file " + src);
         return true;
       }
-      // start recovery of the last block for this file
-      long blockRecoveryId = nextGenerationStamp(
-          blockManager.isLegacyBlock(lastBlock));
-      lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
-      if(copyOnTruncate) {
-        lastBlock.setGenerationStamp(blockRecoveryId);
-      } else if(truncateRecovery) {
-        recoveryBlock.setGenerationStamp(blockRecoveryId);
-      }
-      uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
-      leaseManager.renewLease(lease);
-      // Cannot close file right now, since the last block requires recovery.
-      // This may potentially cause infinite loop in lease recovery
-      // if there are no valid replicas on data-nodes.
-      NameNode.stateChangeLog.warn(
-                "DIR* NameSystem.internalReleaseLease: " +
+      // Start recovery of the last block for this file
+      // Only do so if there is no ongoing recovery for this block,
+      // or the previous recovery for this block timed out.
+      if (blockManager.addBlockRecoveryAttempt(lastBlock)) {
+        long blockRecoveryId = nextGenerationStamp(
+            blockManager.isLegacyBlock(lastBlock));
+        if(copyOnTruncate) {
+          lastBlock.setGenerationStamp(blockRecoveryId);
+        } else if(truncateRecovery) {
+          recoveryBlock.setGenerationStamp(blockRecoveryId);
+        }
+        uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
+
+        // Cannot close file right now, since the last block requires recovery.
+        // This may potentially cause infinite loop in lease recovery
+        // if there are no valid replicas on data-nodes.
+        NameNode.stateChangeLog.warn(
+            "DIR* NameSystem.internalReleaseLease: " +
                 "File " + src + " has not been closed." +
-               " Lease recovery is in progress. " +
+                " Lease recovery is in progress. " +
                 "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
+      }
+      lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
+      leaseManager.renewLease(lease);
       break;
     }
     return false;
@@ -3541,6 +3546,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         // If this commit does not want to close the file, persist blocks
         FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
       }
+      blockManager.successfulBlockRecovery(storedBlock);
     } finally {
       writeUnlock("commitBlockSynchronization");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afc606c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index a43d4d9..be3394e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -3015,6 +3015,16 @@ public class MiniDFSCluster implements AutoCloseable {
     // Wait for new namenode to get registrations from all the datanodes
     waitActive(nnIndex);
   }
+
+  /**
+   * Sets the timeout for re-issuing a block recovery.
+   */
+  public void setBlockRecoveryTimeout(long timeout) {
+    for (int nnIndex = 0; nnIndex < getNumNameNodes(); nnIndex++) {
+      getNamesystem(nnIndex).getBlockManager().setBlockRecoveryTimeout(
+          timeout);
+    }
+  }
   
   protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
                            boolean checkDataNodeAddrConfig) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afc606c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
new file mode 100644
index 0000000..baad89f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class contains unit tests for PendingRecoveryBlocks.java functionality.
+ */
+public class TestPendingRecoveryBlocks {
+
+  private PendingRecoveryBlocks pendingRecoveryBlocks;
+  private final long recoveryTimeout = 1000L;
+
+  private final BlockInfo blk1 = getBlock(1);
+  private final BlockInfo blk2 = getBlock(2);
+  private final BlockInfo blk3 = getBlock(3);
+
+  @Before
+  public void setUp() {
+    pendingRecoveryBlocks =
+        Mockito.spy(new PendingRecoveryBlocks(recoveryTimeout));
+  }
+
+  BlockInfo getBlock(long blockId) {
+    return new BlockInfoContiguous(new Block(blockId), (short) 0);
+  }
+
+  @Test
+  public void testAddDifferentBlocks() {
+    assertTrue(pendingRecoveryBlocks.add(blk1));
+    assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk1));
+    assertTrue(pendingRecoveryBlocks.add(blk2));
+    assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk2));
+    assertTrue(pendingRecoveryBlocks.add(blk3));
+    assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk3));
+  }
+
+  @Test
+  public void testAddAndRemoveBlocks() {
+    // Add blocks
+    assertTrue(pendingRecoveryBlocks.add(blk1));
+    assertTrue(pendingRecoveryBlocks.add(blk2));
+
+    // Remove blk1
+    pendingRecoveryBlocks.remove(blk1);
+
+    // Adding back blk1 should succeed
+    assertTrue(pendingRecoveryBlocks.add(blk1));
+  }
+
+  @Test
+  public void testAddBlockWithPreviousRecoveryTimedOut() {
+    // Add blk
+    Mockito.doReturn(0L).when(pendingRecoveryBlocks).getTime();
+    assertTrue(pendingRecoveryBlocks.add(blk1));
+
+    // Should fail, has not timed out yet
+    Mockito.doReturn(recoveryTimeout / 2).when(pendingRecoveryBlocks).getTime();
+    assertFalse(pendingRecoveryBlocks.add(blk1));
+
+    // Should succeed after timing out
+    Mockito.doReturn(recoveryTimeout * 2).when(pendingRecoveryBlocks).getTime();
+    assertTrue(pendingRecoveryBlocks.add(blk1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afc606c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index ab1ad48..1aa538d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -18,7 +18,10 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+import org.apache.hadoop.hdfs.AppendTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
+
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -41,6 +44,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -64,7 +68,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
@@ -90,6 +93,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.test.GenericTestUtils.SleepAnswer;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
@@ -981,4 +985,106 @@ public class TestBlockRecovery {
       Assert.fail("Thread failure: " + failureReason);
     }
   }
+
+  /**
+   * Test for block recovery taking longer than the heartbeat interval.
+   */
+  @Test(timeout = 300000L)
+  public void testRecoverySlowerThanHeartbeat() throws Exception {
+    tearDown(); // Stop the Mocked DN started in startup()
+
+    SleepAnswer delayer = new SleepAnswer(3000, 6000);
+    testRecoveryWithDatanodeDelayed(delayer);
+  }
+
+  /**
+   * Test for block recovery timeout. All recovery attempts will be delayed
+   * and the first attempt will be lost to trigger recovery timeout and retry.
+   */
+  @Test(timeout = 300000L)
+  public void testRecoveryTimeout() throws Exception {
+    tearDown(); // Stop the Mocked DN started in startup()
+    final Random r = new Random();
+
+    // Make sure first commitBlockSynchronization call from the DN gets lost
+    // for the recovery timeout to expire and new recovery attempt
+    // to be started.
+    SleepAnswer delayer = new SleepAnswer(3000) {
+      private final AtomicBoolean callRealMethod = new AtomicBoolean();
+
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        boolean interrupted = false;
+        try {
+          Thread.sleep(r.nextInt(3000) + 6000);
+        } catch (InterruptedException ie) {
+          interrupted = true;
+        }
+        try {
+          if (callRealMethod.get()) {
+            return invocation.callRealMethod();
+          }
+          callRealMethod.set(true);
+          return null;
+        } finally {
+          if (interrupted) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+    };
+    testRecoveryWithDatanodeDelayed(delayer);
+  }
+
+  private void testRecoveryWithDatanodeDelayed(
+      GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception {
+    Configuration configuration = new HdfsConfiguration();
+    configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(configuration)
+          .numDataNodes(2).build();
+      cluster.waitActive();
+      final FSNamesystem ns = cluster.getNamesystem();
+      final NameNode nn = cluster.getNameNode();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      cluster.setBlockRecoveryTimeout(TimeUnit.SECONDS.toMillis(15));
+
+      // Create a file and never close the output stream to trigger recovery
+      FSDataOutputStream out = dfs.create(new Path("/testSlowRecovery"),
+          (short) 2);
+      out.write(AppendTestUtil.randomBytes(0, 4096));
+      out.hsync();
+
+      List<DataNode> dataNodes = cluster.getDataNodes();
+      for (DataNode datanode : dataNodes) {
+        DatanodeProtocolClientSideTranslatorPB nnSpy =
+            InternalDataNodeTestUtils.spyOnBposToNN(datanode, nn);
+
+        Mockito.doAnswer(recoveryDelayer).when(nnSpy).
+            commitBlockSynchronization(
+                Mockito.any(ExtendedBlock.class), Mockito.anyInt(),
+                Mockito.anyLong(), Mockito.anyBoolean(),
+                Mockito.anyBoolean(), Mockito.any(DatanodeID[].class),
+                Mockito.any(String[].class));
+      }
+
+      // Make sure hard lease expires to trigger replica recovery
+      cluster.setLeasePeriod(100L, 100L);
+
+      // Wait for recovery to succeed
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return ns.getCompleteBlocksTotal() > 0;
+        }
+      }, 300, 300000);
+
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afc606c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
index b93004c..dbc264e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
@@ -274,7 +275,7 @@ public class TestPipelinesFailover {
     // Disable permissions so that another user can recover the lease.
     conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
     conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    
+
     FSDataOutputStream stm = null;
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
       .nnTopology(MiniDFSNNTopology.simpleHATopology())
@@ -283,6 +284,7 @@ public class TestPipelinesFailover {
     try {
       cluster.waitActive();
       cluster.transitionToActive(0);
+      cluster.setBlockRecoveryTimeout(TimeUnit.SECONDS.toMillis(1));
       Thread.sleep(500);
 
       LOG.info("Starting with NN 0 active");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: HDFS-11576. Block recovery will fail indefinitely if recovery time > heartbeat interval. Contributed by Lukas Majercak

Posted by cd...@apache.org.
HDFS-11576. Block recovery will fail indefinitely if recovery time > heartbeat interval. Contributed by Lukas Majercak

(cherry picked from commit afc606c3aa9fc886ff2c17ec9a6a83e30c1adad0)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ae3dc443
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ae3dc443
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ae3dc443

Branch: refs/heads/branch-2.9
Commit: ae3dc443f4db1bafba91a864512b42e279714d61
Parents: 89314b7
Author: Chris Douglas <cd...@apache.org>
Authored: Thu Dec 7 00:43:21 2017 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Thu Dec 7 00:47:02 2017 -0800

----------------------------------------------------------------------
 .../apache/hadoop/test/GenericTestUtils.java    |  10 +-
 .../server/blockmanagement/BlockManager.java    |  41 ++++++
 .../blockmanagement/PendingRecoveryBlocks.java  | 143 +++++++++++++++++++
 .../hdfs/server/namenode/FSNamesystem.java      |  40 +++---
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java  |  10 ++
 .../TestPendingRecoveryBlocks.java              |  87 +++++++++++
 .../hdfs/server/datanode/TestBlockRecovery.java | 108 +++++++++++++-
 .../namenode/ha/TestPipelinesFailover.java      |   4 +-
 8 files changed, 422 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae3dc443/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
index 9291bb0..af47d29 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
@@ -625,10 +625,16 @@ public abstract class GenericTestUtils {
    * conditions.
    */
   public static class SleepAnswer implements Answer<Object> {
+    private final int minSleepTime;
     private final int maxSleepTime;
     private static Random r = new Random();
-    
+
     public SleepAnswer(int maxSleepTime) {
+      this(0, maxSleepTime);
+    }
+
+    public SleepAnswer(int minSleepTime, int maxSleepTime) {
+      this.minSleepTime = minSleepTime;
       this.maxSleepTime = maxSleepTime;
     }
     
@@ -636,7 +642,7 @@ public abstract class GenericTestUtils {
     public Object answer(InvocationOnMock invocation) throws Throwable {
       boolean interrupted = false;
       try {
-        Thread.sleep(r.nextInt(maxSleepTime));
+        Thread.sleep(r.nextInt(maxSleepTime) + minSleepTime);
       } catch (InterruptedException ie) {
         interrupted = true;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae3dc443/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index bdb926c..4a8c8c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -43,6 +43,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.management.ObjectName;
@@ -149,6 +150,8 @@ public class BlockManager implements BlockStatsMXBean {
   private static final String QUEUE_REASON_FUTURE_GENSTAMP =
     "generation stamp is in the future";
 
+  private static final long BLOCK_RECOVERY_TIMEOUT_MULTIPLIER = 30;
+
   private final Namesystem namesystem;
 
   private final BlockManagerSafeMode bmSafeMode;
@@ -267,6 +270,9 @@ public class BlockManager implements BlockStatsMXBean {
   @VisibleForTesting
   final PendingReplicationBlocks pendingReplications;
 
+  /** Stores information about block recovery attempts. */
+  private final PendingRecoveryBlocks pendingRecoveryBlocks;
+
   /** The maximum number of replicas allowed for a block */
   public final short maxReplication;
   /**
@@ -446,6 +452,12 @@ public class BlockManager implements BlockStatsMXBean {
     }
     this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
 
+    long heartbeatIntervalSecs = conf.getTimeDuration(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
+    long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs);
+    pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout);
+
     this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
 
     bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
@@ -4114,6 +4126,25 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
+  /**
+   * Notification of a successful block recovery.
+   * @param block for which the recovery succeeded
+   */
+  public void successfulBlockRecovery(BlockInfo block) {
+    pendingRecoveryBlocks.remove(block);
+  }
+
+  /**
+   * Checks whether a recovery attempt has been made for the given block.
+   * If so, checks whether that attempt has timed out.
+   * @param b block for which recovery is being attempted
+   * @return true if no recovery attempt has been made or
+   *         the previous attempt timed out
+   */
+  public boolean addBlockRecoveryAttempt(BlockInfo b) {
+    return pendingRecoveryBlocks.add(b);
+  }
+
   @VisibleForTesting
   public void flushBlockOps() throws IOException {
     runBlockOp(new Callable<Void>(){
@@ -4222,4 +4253,14 @@ public class BlockManager implements BlockStatsMXBean {
   boolean isReplicaCorrupt(BlockInfo blk, DatanodeDescriptor d) {
     return corruptReplicas.isReplicaCorrupt(blk, d);
   }
+
+  private static long getBlockRecoveryTimeout(long heartbeatIntervalSecs) {
+    return TimeUnit.SECONDS.toMillis(heartbeatIntervalSecs *
+        BLOCK_RECOVERY_TIMEOUT_MULTIPLIER);
+  }
+
+  @VisibleForTesting
+  public void setBlockRecoveryTimeout(long blockRecoveryTimeout) {
+    pendingRecoveryBlocks.setRecoveryTimeoutInterval(blockRecoveryTimeout);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae3dc443/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
new file mode 100644
index 0000000..3f5f27c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * PendingRecoveryBlocks tracks recovery attempts for each block and their
+ * timeouts to ensure we do not have multiple recoveries at the same time
+ * and retry only after the timeout for a recovery has expired.
+ */
+class PendingRecoveryBlocks {
+  private static final Logger LOG = BlockManager.LOG;
+
+  /** List of recovery attempts per block and the time they expire. */
+  private final LightWeightHashSet<BlockRecoveryAttempt> recoveryTimeouts =
+      new LightWeightHashSet<>();
+
+  /** The timeout for issuing a block recovery again.
+   * (it should be larger than the time to recover a block)
+   */
+  private long recoveryTimeoutInterval;
+
+  PendingRecoveryBlocks(long timeout) {
+    this.recoveryTimeoutInterval = timeout;
+  }
+
+  /**
+   * Remove recovery attempt for the given block.
+   * @param block whose recovery attempt to remove.
+   */
+  synchronized void remove(BlockInfo block) {
+    recoveryTimeouts.remove(new BlockRecoveryAttempt(block));
+  }
+
+  /**
+   * Checks whether a recovery attempt has been made for the given block.
+   * If so, checks whether that attempt has timed out.
+   * @param block block for which recovery is being attempted
+   * @return true if no recovery attempt has been made or
+   *         the previous attempt timed out
+   */
+  synchronized boolean add(BlockInfo block) {
+    boolean added = false;
+    long curTime = getTime();
+    BlockRecoveryAttempt recoveryAttempt =
+        recoveryTimeouts.getElement(new BlockRecoveryAttempt(block));
+
+    if (recoveryAttempt == null) {
+      BlockRecoveryAttempt newAttempt = new BlockRecoveryAttempt(
+          block, curTime + recoveryTimeoutInterval);
+      added = recoveryTimeouts.add(newAttempt);
+    } else if (recoveryAttempt.hasTimedOut(curTime)) {
+      // Previous attempt timed out, reset the timeout
+      recoveryAttempt.setTimeout(curTime + recoveryTimeoutInterval);
+      added = true;
+    } else {
+      long timeoutIn = TimeUnit.MILLISECONDS.toSeconds(
+          recoveryAttempt.timeoutAt - curTime);
+      LOG.info("Block recovery attempt for " + block + " rejected, as the " +
+          "previous attempt times out in " + timeoutIn + " seconds.");
+    }
+    return added;
+  }
+
+  /**
+   * Check whether the given block is under recovery.
+   * @param b block for which to check
+   * @return true if the given block is being recovered
+   */
+  synchronized boolean isUnderRecovery(BlockInfo b) {
+    BlockRecoveryAttempt recoveryAttempt =
+        recoveryTimeouts.getElement(new BlockRecoveryAttempt(b));
+    return recoveryAttempt != null;
+  }
+
+  long getTime() {
+    return Time.monotonicNow();
+  }
+
+  @VisibleForTesting
+  synchronized void setRecoveryTimeoutInterval(long recoveryTimeoutInterval) {
+    this.recoveryTimeoutInterval = recoveryTimeoutInterval;
+  }
+
+  /**
+   * Tracks timeout for block recovery attempt of a given block.
+   */
+  private static class BlockRecoveryAttempt {
+    private final BlockInfo blockInfo;
+    private long timeoutAt;
+
+    private BlockRecoveryAttempt(BlockInfo blockInfo) {
+      this(blockInfo, 0);
+    }
+
+    BlockRecoveryAttempt(BlockInfo blockInfo, long timeoutAt) {
+      this.blockInfo = blockInfo;
+      this.timeoutAt = timeoutAt;
+    }
+
+    boolean hasTimedOut(long currentTime) {
+      return currentTime > timeoutAt;
+    }
+
+    void setTimeout(long newTimeoutAt) {
+      this.timeoutAt = newTimeoutAt;
+    }
+
+    @Override
+    public int hashCode() {
+      return blockInfo.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof BlockRecoveryAttempt) {
+        return this.blockInfo.equals(((BlockRecoveryAttempt) obj).blockInfo);
+      }
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae3dc443/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index e21da7f..50e9420 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3240,25 +3240,30 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             + "Removed empty last block and closed file " + src);
         return true;
       }
-      // start recovery of the last block for this file
-      long blockRecoveryId = nextGenerationStamp(
-          blockManager.isLegacyBlock(lastBlock));
-      lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
-      if(copyOnTruncate) {
-        lastBlock.setGenerationStamp(blockRecoveryId);
-      } else if(truncateRecovery) {
-        recoveryBlock.setGenerationStamp(blockRecoveryId);
-      }
-      uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
-      leaseManager.renewLease(lease);
-      // Cannot close file right now, since the last block requires recovery.
-      // This may potentially cause infinite loop in lease recovery
-      // if there are no valid replicas on data-nodes.
-      NameNode.stateChangeLog.warn(
-                "DIR* NameSystem.internalReleaseLease: " +
+      // Start recovery of the last block for this file
+      // Only do so if there is no ongoing recovery for this block,
+      // or the previous recovery for this block timed out.
+      if (blockManager.addBlockRecoveryAttempt(lastBlock)) {
+        long blockRecoveryId = nextGenerationStamp(
+            blockManager.isLegacyBlock(lastBlock));
+        if(copyOnTruncate) {
+          lastBlock.setGenerationStamp(blockRecoveryId);
+        } else if(truncateRecovery) {
+          recoveryBlock.setGenerationStamp(blockRecoveryId);
+        }
+        uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
+
+        // Cannot close file right now, since the last block requires recovery.
+        // This may potentially cause infinite loop in lease recovery
+        // if there are no valid replicas on data-nodes.
+        NameNode.stateChangeLog.warn(
+            "DIR* NameSystem.internalReleaseLease: " +
                 "File " + src + " has not been closed." +
-               " Lease recovery is in progress. " +
+                " Lease recovery is in progress. " +
                 "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
+      }
+      lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
+      leaseManager.renewLease(lease);
       break;
     }
     return false;
@@ -3541,6 +3546,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         // If this commit does not want to close the file, persist blocks
         FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
       }
+      blockManager.successfulBlockRecovery(storedBlock);
     } finally {
       writeUnlock("commitBlockSynchronization");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae3dc443/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index a43d4d9..be3394e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -3015,6 +3015,16 @@ public class MiniDFSCluster implements AutoCloseable {
     // Wait for new namenode to get registrations from all the datanodes
     waitActive(nnIndex);
   }
+
+  /**
+   * Sets the timeout for re-issuing a block recovery.
+   */
+  public void setBlockRecoveryTimeout(long timeout) {
+    for (int nnIndex = 0; nnIndex < getNumNameNodes(); nnIndex++) {
+      getNamesystem(nnIndex).getBlockManager().setBlockRecoveryTimeout(
+          timeout);
+    }
+  }
   
   protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
                            boolean checkDataNodeAddrConfig) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae3dc443/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
new file mode 100644
index 0000000..baad89f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class contains unit tests for PendingRecoveryBlocks.java functionality.
+ */
+public class TestPendingRecoveryBlocks {
+
+  private PendingRecoveryBlocks pendingRecoveryBlocks;
+  private final long recoveryTimeout = 1000L;
+
+  private final BlockInfo blk1 = getBlock(1);
+  private final BlockInfo blk2 = getBlock(2);
+  private final BlockInfo blk3 = getBlock(3);
+
+  @Before
+  public void setUp() {
+    pendingRecoveryBlocks =
+        Mockito.spy(new PendingRecoveryBlocks(recoveryTimeout));
+  }
+
+  BlockInfo getBlock(long blockId) {
+    return new BlockInfoContiguous(new Block(blockId), (short) 0);
+  }
+
+  @Test
+  public void testAddDifferentBlocks() {
+    assertTrue(pendingRecoveryBlocks.add(blk1));
+    assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk1));
+    assertTrue(pendingRecoveryBlocks.add(blk2));
+    assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk2));
+    assertTrue(pendingRecoveryBlocks.add(blk3));
+    assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk3));
+  }
+
+  @Test
+  public void testAddAndRemoveBlocks() {
+    // Add blocks
+    assertTrue(pendingRecoveryBlocks.add(blk1));
+    assertTrue(pendingRecoveryBlocks.add(blk2));
+
+    // Remove blk1
+    pendingRecoveryBlocks.remove(blk1);
+
+    // Adding back blk1 should succeed
+    assertTrue(pendingRecoveryBlocks.add(blk1));
+  }
+
+  @Test
+  public void testAddBlockWithPreviousRecoveryTimedOut() {
+    // Add blk
+    Mockito.doReturn(0L).when(pendingRecoveryBlocks).getTime();
+    assertTrue(pendingRecoveryBlocks.add(blk1));
+
+    // Should fail, has not timed out yet
+    Mockito.doReturn(recoveryTimeout / 2).when(pendingRecoveryBlocks).getTime();
+    assertFalse(pendingRecoveryBlocks.add(blk1));
+
+    // Should succeed after timing out
+    Mockito.doReturn(recoveryTimeout * 2).when(pendingRecoveryBlocks).getTime();
+    assertTrue(pendingRecoveryBlocks.add(blk1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae3dc443/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index ab1ad48..1aa538d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -18,7 +18,10 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+import org.apache.hadoop.hdfs.AppendTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
+
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -41,6 +44,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -64,7 +68,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
@@ -90,6 +93,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.test.GenericTestUtils.SleepAnswer;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
@@ -981,4 +985,106 @@ public class TestBlockRecovery {
       Assert.fail("Thread failure: " + failureReason);
     }
   }
+
+  /**
+   * Test for block recovery taking longer than the heartbeat interval.
+   */
+  @Test(timeout = 300000L)
+  public void testRecoverySlowerThanHeartbeat() throws Exception {
+    tearDown(); // Stop the Mocked DN started in startup()
+
+    SleepAnswer delayer = new SleepAnswer(3000, 6000);
+    testRecoveryWithDatanodeDelayed(delayer);
+  }
+
+  /**
+   * Test for block recovery timeout. All recovery attempts will be delayed
+   * and the first attempt will be lost to trigger recovery timeout and retry.
+   */
+  @Test(timeout = 300000L)
+  public void testRecoveryTimeout() throws Exception {
+    tearDown(); // Stop the Mocked DN started in startup()
+    final Random r = new Random();
+
+    // Make sure first commitBlockSynchronization call from the DN gets lost
+    // for the recovery timeout to expire and new recovery attempt
+    // to be started.
+    SleepAnswer delayer = new SleepAnswer(3000) {
+      private final AtomicBoolean callRealMethod = new AtomicBoolean();
+
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        boolean interrupted = false;
+        try {
+          Thread.sleep(r.nextInt(3000) + 6000);
+        } catch (InterruptedException ie) {
+          interrupted = true;
+        }
+        try {
+          if (callRealMethod.get()) {
+            return invocation.callRealMethod();
+          }
+          callRealMethod.set(true);
+          return null;
+        } finally {
+          if (interrupted) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+    };
+    testRecoveryWithDatanodeDelayed(delayer);
+  }
+
+  private void testRecoveryWithDatanodeDelayed(
+      GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception {
+    Configuration configuration = new HdfsConfiguration();
+    configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(configuration)
+          .numDataNodes(2).build();
+      cluster.waitActive();
+      final FSNamesystem ns = cluster.getNamesystem();
+      final NameNode nn = cluster.getNameNode();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      cluster.setBlockRecoveryTimeout(TimeUnit.SECONDS.toMillis(15));
+
+      // Create a file and never close the output stream to trigger recovery
+      FSDataOutputStream out = dfs.create(new Path("/testSlowRecovery"),
+          (short) 2);
+      out.write(AppendTestUtil.randomBytes(0, 4096));
+      out.hsync();
+
+      List<DataNode> dataNodes = cluster.getDataNodes();
+      for (DataNode datanode : dataNodes) {
+        DatanodeProtocolClientSideTranslatorPB nnSpy =
+            InternalDataNodeTestUtils.spyOnBposToNN(datanode, nn);
+
+        Mockito.doAnswer(recoveryDelayer).when(nnSpy).
+            commitBlockSynchronization(
+                Mockito.any(ExtendedBlock.class), Mockito.anyInt(),
+                Mockito.anyLong(), Mockito.anyBoolean(),
+                Mockito.anyBoolean(), Mockito.any(DatanodeID[].class),
+                Mockito.any(String[].class));
+      }
+
+      // Make sure hard lease expires to trigger replica recovery
+      cluster.setLeasePeriod(100L, 100L);
+
+      // Wait for recovery to succeed
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return ns.getCompleteBlocksTotal() > 0;
+        }
+      }, 300, 300000);
+
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae3dc443/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
index b93004c..dbc264e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
@@ -274,7 +275,7 @@ public class TestPipelinesFailover {
     // Disable permissions so that another user can recover the lease.
     conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
     conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    
+
     FSDataOutputStream stm = null;
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
       .nnTopology(MiniDFSNNTopology.simpleHATopology())
@@ -283,6 +284,7 @@ public class TestPipelinesFailover {
     try {
       cluster.waitActive();
       cluster.transitionToActive(0);
+      cluster.setBlockRecoveryTimeout(TimeUnit.SECONDS.toMillis(1));
       Thread.sleep(500);
 
       LOG.info("Starting with NN 0 active");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org