You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aw...@apache.org on 2015/06/30 20:52:31 UTC

[14/30] hadoop git commit: HDFS-8646. Prune cached replicas from DatanodeDescriptor state on replica invalidation.

HDFS-8646. Prune cached replicas from DatanodeDescriptor state on replica invalidation.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/afe9ea3c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/afe9ea3c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/afe9ea3c

Branch: refs/heads/HADOOP-12111
Commit: afe9ea3c12e1f5a71922400eadb642960bc87ca1
Parents: 4c659dd
Author: Andrew Wang <wa...@apache.org>
Authored: Wed Jun 24 14:42:33 2015 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Wed Jun 24 14:42:33 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +++
 .../server/blockmanagement/BlockManager.java    | 14 +++++++++++
 .../hdfs/server/datanode/BPServiceActor.java    |  6 +++--
 .../hadoop/hdfs/server/datanode/DataNode.java   | 17 +++++++++++--
 .../hdfs/server/namenode/CacheManager.java      | 24 ++++++++++++++++---
 .../hdfs/server/namenode/FSNamesystem.java      |  1 +
 .../hadoop/hdfs/server/namenode/Namesystem.java | 18 +++++++-------
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     | 18 ++++++++++++++
 .../hdfs/server/datanode/DataNodeTestUtils.java | 11 +++++++++
 .../server/namenode/TestCacheDirectives.java    | 25 ++++++++++++++++++++
 10 files changed, 122 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/afe9ea3c/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d94a213..4268154 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -949,6 +949,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8542. WebHDFS getHomeDirectory behavior does not match specification.
     (Kanaka Kumar Avvaru via jghoman)
 
+    HDFS-8546. Prune cached replicas from DatanodeDescriptor state on replica
+    invalidation. (wang)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afe9ea3c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 7d3a678..368d3b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBloc
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -3108,6 +3109,19 @@ public class BlockManager {
         return;
       }
 
+      CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks()
+          .get(new CachedBlock(block.getBlockId(), (short) 0, false));
+      if (cblock != null) {
+        boolean removed = false;
+        removed |= node.getPendingCached().remove(cblock);
+        removed |= node.getCached().remove(cblock);
+        removed |= node.getPendingUncached().remove(cblock);
+        if (removed) {
+          blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching "
+              + "related lists on node {}", block, node);
+        }
+      }
+
       //
       // It's possible that the block was removed because of a datanode
       // failure. If the block is still valid, check if replication is

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afe9ea3c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index f84dd99..1817427 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -709,8 +709,10 @@ class BPServiceActor implements Runnable {
         }
         processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
 
-        DatanodeCommand cmd = cacheReport();
-        processCommand(new DatanodeCommand[]{ cmd });
+        if (!dn.areCacheReportsDisabledForTests()) {
+          DatanodeCommand cmd = cacheReport();
+          processCommand(new DatanodeCommand[]{ cmd });
+        }
 
         //
         // There is no work to do;  sleep until hearbeat timer elapses, 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afe9ea3c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 6c8cf2b..e265dad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -301,6 +301,7 @@ public class DataNode extends ReconfigurableBase
   ThreadGroup threadGroup = null;
   private DNConf dnConf;
   private volatile boolean heartbeatsDisabledForTests = false;
+  private volatile boolean cacheReportsDisabledForTests = false;
   private DataStorage storage = null;
 
   private DatanodeHttpServer httpServer = null;
@@ -1055,15 +1056,27 @@ public class DataNode extends ReconfigurableBase
 
   
   // used only for testing
+  @VisibleForTesting
   void setHeartbeatsDisabledForTests(
       boolean heartbeatsDisabledForTests) {
     this.heartbeatsDisabledForTests = heartbeatsDisabledForTests;
   }
-  
+
+  @VisibleForTesting
   boolean areHeartbeatsDisabledForTests() {
     return this.heartbeatsDisabledForTests;
   }
-  
+
+  @VisibleForTesting
+  void setCacheReportsDisabledForTest(boolean disabled) {
+    this.cacheReportsDisabledForTests = disabled;
+  }
+
+  @VisibleForTesting
+  boolean areCacheReportsDisabledForTests() {
+    return this.cacheReportsDisabledForTests;
+  }
+
   /**
    * This method starts the data node with the specified conf.
    * 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afe9ea3c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
index e5270ad..e09ba32 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
@@ -902,9 +903,26 @@ public final class CacheManager {
     if (cachedBlock == null) {
       return;
     }
-    List<DatanodeDescriptor> datanodes = cachedBlock.getDatanodes(Type.CACHED);
-    for (DatanodeDescriptor datanode : datanodes) {
-      block.addCachedLoc(datanode);
+    List<DatanodeDescriptor> cachedDNs = cachedBlock.getDatanodes(Type.CACHED);
+    for (DatanodeDescriptor datanode : cachedDNs) {
+      // Filter out cached blocks that do not have a backing replica.
+      //
+      // This should not happen since it means the CacheManager thinks
+      // something is cached that does not exist, but it's a safety
+      // measure.
+      boolean found = false;
+      for (DatanodeInfo loc : block.getLocations()) {
+        if (loc.equals(datanode)) {
+          block.addCachedLoc(loc);
+          found = true;
+          break;
+        }
+      }
+      if (!found) {
+        LOG.warn("Datanode {} is not a valid cache location for block {} "
+            + "because that node does not have a backing replica!",
+            datanode, block.getBlock().getBlockName());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afe9ea3c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index d82da93..b073a89 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -6460,6 +6460,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     this.dir = dir;
   }
   /** @return the cache manager. */
+  @Override
   public CacheManager getCacheManager() {
     return cacheManager;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afe9ea3c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index 40c4765..1732865 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@ -29,21 +29,23 @@ import org.apache.hadoop.security.AccessControlException;
 @InterfaceAudience.Private
 public interface Namesystem extends RwLock, SafeMode {
   /** Is this name system running? */
-  public boolean isRunning();
+  boolean isRunning();
 
   /** Check if the user has superuser privilege. */
-  public void checkSuperuserPrivilege() throws AccessControlException;
+  void checkSuperuserPrivilege() throws AccessControlException;
 
   /** @return the block pool ID */
-  public String getBlockPoolId();
+  String getBlockPoolId();
 
-  public boolean isInStandbyState();
+  boolean isInStandbyState();
 
-  public boolean isGenStampInFuture(Block block);
+  boolean isGenStampInFuture(Block block);
 
-  public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
+  void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
 
-  public void checkOperation(OperationCategory read) throws StandbyException;
+  void checkOperation(OperationCategory read) throws StandbyException;
 
-  public boolean isInSnapshot(BlockInfoUnderConstruction blockUC);
+  boolean isInSnapshot(BlockInfoUnderConstruction blockUC);
+
+  CacheManager getCacheManager();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afe9ea3c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index d06b024..96fb669 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -79,6 +79,7 @@ import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.FsShell;
@@ -526,6 +527,23 @@ public class DFSTestUtil {
     }
   }
 
+  public static void waitForReplication(final DistributedFileSystem dfs,
+      final Path file, final short replication, int waitForMillis)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          FileStatus stat = dfs.getFileStatus(file);
+          return replication == stat.getReplication();
+        } catch (IOException e) {
+          LOG.info("getFileStatus on path " + file + " failed!", e);
+          return false;
+        }
+      }
+    }, 100, waitForMillis);
+  }
+
   /**
    * Keep accessing the given file until the namenode reports that the
    * given block in the file contains the given number of corrupt replicas.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afe9ea3c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
index 9dee724..2f9a3e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -53,6 +54,16 @@ public class DataNodeTestUtils {
     dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests);
   }
 
+  /**
+   * Set if cache reports are disabled for all DNs in a mini cluster.
+   */
+  public static void setCacheReportsDisabledForTests(MiniDFSCluster cluster,
+      boolean disabled) {
+    for (DataNode dn : cluster.getDataNodes()) {
+      dn.setCacheReportsDisabledForTest(disabled);
+    }
+  }
+
   public static void triggerDeletionReport(DataNode dn) throws IOException {
     for (BPOfferService bpos : dn.getAllBpOs()) {
       bpos.triggerDeletionReportForTests();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afe9ea3c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
index 6027934..cf00405 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
@@ -1510,4 +1511,28 @@ public class TestCacheDirectives {
     Thread.sleep(1000);
     checkPendingCachedEmpty(cluster);
   }
+
+  @Test(timeout=60000)
+  public void testNoBackingReplica() throws Exception {
+    // Cache all three replicas for a file.
+    final Path filename = new Path("/noback");
+    final short replication = (short) 3;
+    DFSTestUtil.createFile(dfs, filename, 1, replication, 0x0BAC);
+    dfs.addCachePool(new CachePoolInfo("pool"));
+    dfs.addCacheDirective(
+        new CacheDirectiveInfo.Builder().setPool("pool").setPath(filename)
+            .setReplication(replication).build());
+    waitForCachedBlocks(namenode, 1, replication, "testNoBackingReplica:1");
+    // Pause cache reports while we change the replication factor.
+    // This will orphan some cached replicas.
+    DataNodeTestUtils.setCacheReportsDisabledForTests(cluster, true);
+    try {
+      dfs.setReplication(filename, (short) 1);
+      DFSTestUtil.waitForReplication(dfs, filename, (short) 1, 30000);
+      // The cache locations should drop down to 1 even without cache reports.
+      waitForCachedBlocks(namenode, 1, (short) 1, "testNoBackingReplica:2");
+    } finally {
+      DataNodeTestUtils.setCacheReportsDisabledForTests(cluster, false);
+    }
+  }
 }