You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2008/09/17 07:07:01 UTC

svn commit: r696149 - in /hadoop/core/trunk: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java

Author: szetszwo
Date: Tue Sep 16 22:07:00 2008
New Revision: 696149

URL: http://svn.apache.org/viewvc?rev=696149&view=rev
Log:
HADOOP-3623. Refactor LeaseManager. (szetszwo)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=696149&r1=696148&r2=696149&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Sep 16 22:07:00 2008
@@ -577,9 +577,11 @@
     HADOOP-4139. Optimize Hive multi group-by.
     (Namin Jain via dhruba)
 
-		HADOOP-3911. Add a check to fsck options to make sure -files is not 
-		the first option to resolve conflicts with GenericOptionsParser
-		(lohit)
+    HADOOP-3911. Add a check to fsck options to make sure -files is not 
+    the first option to resolve conflicts with GenericOptionsParser
+    (lohit)
+
+    HADOOP-3623. Refactor LeaseManager. (szetszwo)
 
 Release 0.18.1 - 2008-09-17
 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=696149&r1=696148&r2=696149&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Sep 16 22:07:00 2008
@@ -33,8 +33,11 @@
 import java.security.SecureRandom;
 import java.util.AbstractList;
 import java.util.ArrayList;
-import java.util.Formatter;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Semaphore;
 
@@ -50,11 +53,11 @@
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
@@ -142,6 +145,8 @@
 
   volatile boolean shouldRun = true;
   private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
+  /** list of blocks being recovered */
+  private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();
   private LinkedList<String> delHints = new LinkedList<String>();
   public final static String EMPTY_DEL_HINT = "";
   int xmitsInProgress = 0;
@@ -1319,8 +1324,15 @@
 
   public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
     Daemon d = new Daemon(threadGroup, new Runnable() {
+      /** Recover a list of blocks. It is run by the primary datanode. */
       public void run() {
-        LeaseManager.recoverBlocks(blocks, targets, DataNode.this, namenode, getConf());
+        for(int i = 0; i < blocks.length; i++) {
+          try {
+            recoverBlock(blocks[i], targets[i], true);
+          } catch (IOException e) {
+            LOG.warn("recoverBlocks, i=" + i, e);
+          }
+        }
       }
     });
     d.start();
@@ -1353,6 +1365,129 @@
         + ": " + protocol);
   }
 
+  /** A convenient class used in lease recovery */
+  private static class BlockRecord { 
+    final DatanodeID id;
+    final InterDatanodeProtocol datanode;
+    final Block block;
+    
+    BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
+      this.id = id;
+      this.datanode = datanode;
+      this.block = block;
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      return "block:" + block + " node:" + id;
+    }
+  }
+
+  /** Recover a block */
+  private LocatedBlock recoverBlock(Block block, DatanodeID[] datanodeids,
+      boolean closeFile) throws IOException {
+
+    // If the block is already being recovered, then skip recovering it.
+    // This can happen if the namenode and client start recovering the same
+    // file at the same time.
+    synchronized (ongoingRecovery) {
+      Block tmp = new Block();
+      tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
+      if (ongoingRecovery.get(tmp) != null) {
+        String msg = "Block " + block + " is already being recovered, " +
+                     " ignoring this request to recover it.";
+        LOG.info(msg);
+        throw new IOException(msg);
+      }
+      ongoingRecovery.put(block, block);
+    }
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("block=" + block
+            + ", datanodeids=" + Arrays.asList(datanodeids));
+      }
+      List<BlockRecord> syncList = new ArrayList<BlockRecord>();
+      long minlength = Long.MAX_VALUE;
+      int errorCount = 0;
+
+      //check generation stamps
+      for(DatanodeID id : datanodeids) {
+        try {
+          InterDatanodeProtocol datanode = dnRegistration.equals(id)?
+              this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
+          BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
+          if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
+            syncList.add(new BlockRecord(id, datanode, new Block(info)));
+            if (info.getNumBytes() < minlength) {
+              minlength = info.getNumBytes();
+            }
+          }
+        } catch (IOException e) {
+          ++errorCount;
+          InterDatanodeProtocol.LOG.warn(
+              "Failed to getBlockMetaDataInfo for block (=" + block 
+              + ") from datanode (=" + id + ")", e);
+        }
+      }
+
+      if (syncList.isEmpty() && errorCount > 0) {
+        throw new IOException("All datanodes failed: block=" + block
+            + ", datanodeids=" + Arrays.asList(datanodeids));
+      }
+      return syncBlock(block, minlength, syncList, closeFile);
+    } finally {
+      synchronized (ongoingRecovery) {
+        ongoingRecovery.remove(block);
+      }
+    }
+  }
+
+  /** Block synchronization */
+  private LocatedBlock syncBlock(Block block, long minlength,
+      List<BlockRecord> syncList, boolean closeFile) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("block=" + block + ", minlength=" + minlength
+          + ", syncList=" + syncList + ", closeFile=" + closeFile);
+    }
+
+    //syncList.isEmpty() that all datanodes do not have the block
+    //so the block can be deleted.
+    if (syncList.isEmpty()) {
+      namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
+          DatanodeID.EMPTY_ARRAY);
+      return null;
+    }
+
+    List<DatanodeID> successList = new ArrayList<DatanodeID>();
+
+    long generationstamp = namenode.nextGenerationStamp(block);
+    Block newblock = new Block(block.getBlockId(), minlength, generationstamp);
+
+    for(BlockRecord r : syncList) {
+      try {
+        r.datanode.updateBlock(r.block, newblock, closeFile);
+        successList.add(r.id);
+      } catch (IOException e) {
+        InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+            + newblock + ", datanode=" + r.id + ")", e);
+      }
+    }
+
+    if (!successList.isEmpty()) {
+      DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
+
+      namenode.commitBlockSynchronization(block,
+          newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
+          nlist);
+      DatanodeInfo[] info = new DatanodeInfo[nlist.length];
+      for (int i = 0; i < nlist.length; i++) {
+        info[i] = new DatanodeInfo(nlist[i]);
+      }
+      return new LocatedBlock(newblock, info); // success
+    }
+    return null; // failed
+  }
+  
   // ClientDataNodeProtocol implementation
   /** {@inheritDoc} */
   public LocatedBlock recoverBlock(Block block, DatanodeInfo[] targets
@@ -1366,7 +1501,6 @@
     }
     LOG.info("Client invoking recoverBlock for block " + block +
              " on datanodes " + msg.toString());
-    return LeaseManager.recoverBlock(block, targets, this, namenode, 
-                                     getConf(), false);
+    return recoverBlock(block, targets, false);
   }
 }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=696149&r1=696148&r2=696149&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Tue Sep 16 22:07:00 2008
@@ -18,23 +18,20 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 
 /**
  * LeaseManager does the lease housekeeping for writing on files.   
@@ -91,10 +88,6 @@
   /** @return the lease containing src */
   public Lease getLeaseByPath(String src) {return sortedLeasesByPath.get(src);}
 
-  /** list of blocks being recovered */
-  private static Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();
-
-
   /** @return the number of leases currently in the system */
   public synchronized int countLease() {return sortedLeases.size();}
 
@@ -390,146 +383,4 @@
         + "\n sortedLeasesByPath=" + sortedLeasesByPath
         + "\n}";
   }
-
-  /*
-   * The following codes provides useful static methods for lease recovery.
-   */
-  /** A convenient class used in lease recovery */
-  private static class BlockRecord { 
-    final DatanodeID id;
-    final InterDatanodeProtocol datanode;
-    final Block block;
-    
-    BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
-      this.id = id;
-      this.datanode = datanode;
-      this.block = block;
-    }
-
-    public String toString() {
-      return "block:" + block + " node:" + id;
-    }
-  }
-
-  /**
-   * Recover a list of blocks.
-   * This method is invoked by the primary datanode.
-   */
-  public static void recoverBlocks(Block[] blocks, DatanodeID[][] targets,
-      DataNode primary, DatanodeProtocol namenode, Configuration conf) {
-    for(int i = 0; i < blocks.length; i++) {
-      try {
-        recoverBlock(blocks[i], targets[i], primary, namenode, conf, true);
-      } catch (IOException e) {
-        LOG.warn("recoverBlocks, i=" + i, e);
-      }
-    }
-  }
-
-  /** Recover a block */
-  public static LocatedBlock recoverBlock(Block block, DatanodeID[] datanodeids,
-      DataNode primary, DatanodeProtocol namenode, Configuration conf,
-      boolean closeFile) throws IOException {
-
-    // If the block is already being recovered, then skip recovering it.
-    // This can happen if the namenode and client start recovering the same
-    // file at the same time.
-    synchronized (ongoingRecovery) {
-      Block tmp = new Block();
-      tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
-      if (ongoingRecovery.get(tmp) != null) {
-        String msg = "Block " + block + " is already being recovered, " +
-                     " ignoring this request to recover it.";
-        LOG.info(msg);
-        throw new IOException(msg);
-      }
-      ongoingRecovery.put(block, block);
-    }
-    try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("block=" + block
-            + ", datanodeids=" + Arrays.asList(datanodeids));
-      }
-      List<BlockRecord> syncList = new ArrayList<BlockRecord>();
-      long minlength = Long.MAX_VALUE;
-      int errorCount = 0;
-
-      //check generation stamps
-      for(DatanodeID id : datanodeids) {
-        try {
-          InterDatanodeProtocol datanode = primary.dnRegistration.equals(id)?
-              primary: DataNode.createInterDataNodeProtocolProxy(id, conf);
-          BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
-          if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
-            syncList.add(new BlockRecord(id, datanode, new Block(info)));
-            if (info.getNumBytes() < minlength) {
-              minlength = info.getNumBytes();
-            }
-          }
-        } catch (IOException e) {
-          ++errorCount;
-          InterDatanodeProtocol.LOG.warn(
-              "Failed to getBlockMetaDataInfo for block (=" + block 
-              + ") from datanode (=" + id + ")", e);
-        }
-      }
-
-      if (syncList.isEmpty() && errorCount > 0) {
-        throw new IOException("All datanodes failed: block=" + block
-            + ", datanodeids=" + Arrays.asList(datanodeids));
-      }
-      return syncBlock(block, minlength, syncList, namenode, closeFile);
-    } finally {
-      synchronized (ongoingRecovery) {
-        ongoingRecovery.remove(block);
-      }
-    }
-  }
-
-  /** Block synchronization */
-  private static LocatedBlock syncBlock(Block block, long minlength,
-      List<BlockRecord> syncList, DatanodeProtocol namenode,
-      boolean closeFile) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("block=" + block + ", minlength=" + minlength
-          + ", syncList=" + syncList + ", closeFile=" + closeFile);
-    }
-
-    //syncList.isEmpty() that all datanodes do not have the block
-    //so the block can be deleted.
-    if (syncList.isEmpty()) {
-      namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
-          DatanodeID.EMPTY_ARRAY);
-      return null;
-    }
-
-    List<DatanodeID> successList = new ArrayList<DatanodeID>();
-
-    long generationstamp = namenode.nextGenerationStamp(block);
-    Block newblock = new Block(block.getBlockId(), minlength, generationstamp);
-
-    for(BlockRecord r : syncList) {
-      try {
-        r.datanode.updateBlock(r.block, newblock, closeFile);
-        successList.add(r.id);
-      } catch (IOException e) {
-        InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
-            + newblock + ", datanode=" + r.id + ")", e);
-      }
-    }
-
-    if (!successList.isEmpty()) {
-      DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
-
-      namenode.commitBlockSynchronization(block,
-          newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
-          nlist);
-      DatanodeInfo[] info = new DatanodeInfo[nlist.length];
-      for (int i = 0; i < nlist.length; i++) {
-        info[i] = new DatanodeInfo(nlist[i]);
-      }
-      return new LocatedBlock(newblock, info); // success
-    }
-    return null; // failed
-  }
 }