You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/02/22 21:37:28 UTC

svn commit: r1292497 - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: CHANGES.HDFS-1623.txt src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

Author: todd
Date: Wed Feb 22 20:37:28 2012
New Revision: 1292497

URL: http://svn.apache.org/viewvc?rev=1292497&view=rev
Log:
HDFS-2972. Small optimization building incremental block report. Contributed by Todd Lipcon.

Modified:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1292497&r1=1292496&r2=1292497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt Wed Feb 22 20:37:28 2012
@@ -222,3 +222,5 @@ HDFS-2952. NN should not start with upgr
 HDFS-2974. MiniDFSCluster does not delete standby NN name dirs during format. (atm)
 
 HDFS-2929. Stress test and fixes for block synchronization (todd)
+
+HDFS-2972. Small optimization building incremental block report (todd)

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1292497&r1=1292496&r2=1292497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Wed Feb 22 20:37:28 2012
@@ -24,7 +24,8 @@ import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.util.Collection;
-import java.util.LinkedList;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -54,6 +55,7 @@ import org.apache.hadoop.ipc.RemoteExcep
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
 
 /**
  * A thread per active or standby namenode to perform:
@@ -81,8 +83,16 @@ class BPServiceActor implements Runnable
   DatanodeProtocolClientSideTranslatorPB bpNamenode;
   private long lastHeartbeat = 0;
   private volatile boolean initialized = false;
-  private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList 
-    = new LinkedList<ReceivedDeletedBlockInfo>();
+  
+  /**
+   * Between block reports (which happen on the order of once an hour) the
+   * DN reports smaller incremental changes to its block list. This map,
+   * keyed by block ID, contains the pending changes which have yet to be
+   * reported to the NN. Access should be synchronized on this object.
+   */
+  private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR 
+    = Maps.newHashMap();
+  
   private volatile int pendingReceivedRequests = 0;
   private volatile boolean shouldServiceRun = true;
   private final DataNode dn;
@@ -242,28 +252,39 @@ class BPServiceActor implements Runnable
 
     // check if there are newly received blocks
     ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
-    int currentReceivedRequestsCounter;
-    synchronized (receivedAndDeletedBlockList) {
-      currentReceivedRequestsCounter = pendingReceivedRequests;
-      int numBlocks = receivedAndDeletedBlockList.size();
+    synchronized (pendingIncrementalBR) {
+      int numBlocks = pendingIncrementalBR.size();
       if (numBlocks > 0) {
         //
         // Send newly-received and deleted blockids to namenode
         //
-        receivedAndDeletedBlockArray = receivedAndDeletedBlockList
-            .toArray(new ReceivedDeletedBlockInfo[numBlocks]);
+        receivedAndDeletedBlockArray = pendingIncrementalBR
+            .values().toArray(new ReceivedDeletedBlockInfo[numBlocks]);
       }
+      pendingIncrementalBR.clear();
     }
     if (receivedAndDeletedBlockArray != null) {
       StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
           bpRegistration.getStorageID(), receivedAndDeletedBlockArray) };
-      bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
-          report);
-      synchronized (receivedAndDeletedBlockList) {
-        for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
-          receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
+      boolean success = false;
+      try {
+        bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
+            report);
+        success = true;
+      } finally {
+        synchronized (pendingIncrementalBR) {
+          if (!success) {
+            // If we didn't succeed in sending the report, put all of the
+            // blocks back onto our queue, but only in the case where we didn't
+            // put something newer in the meantime.
+            for (ReceivedDeletedBlockInfo rdbi : receivedAndDeletedBlockArray) {
+              if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
+                pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
+              }
+            }
+          }
+          pendingReceivedRequests = pendingIncrementalBR.size();
         }
-        pendingReceivedRequests -= currentReceivedRequestsCounter;
       }
     }
   }
@@ -274,16 +295,18 @@ class BPServiceActor implements Runnable
    * client? For now we don't.
    */
   void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo bInfo) {
-    synchronized (receivedAndDeletedBlockList) {
-      receivedAndDeletedBlockList.add(bInfo);
+    synchronized (pendingIncrementalBR) {
+      pendingIncrementalBR.put(
+          bInfo.getBlock().getBlockId(), bInfo);
       pendingReceivedRequests++;
-      receivedAndDeletedBlockList.notifyAll();
+      pendingIncrementalBR.notifyAll();
     }
   }
 
   void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo bInfo) {
-    synchronized (receivedAndDeletedBlockList) {
-      receivedAndDeletedBlockList.add(bInfo);
+    synchronized (pendingIncrementalBR) {
+      pendingIncrementalBR.put(
+          bInfo.getBlock().getBlockId(), bInfo);
     }
   }
 
@@ -292,13 +315,13 @@ class BPServiceActor implements Runnable
    */
   @VisibleForTesting
   void triggerBlockReportForTests() throws IOException {
-    synchronized (receivedAndDeletedBlockList) {
+    synchronized (pendingIncrementalBR) {
       lastBlockReport = 0;
       lastHeartbeat = 0;
-      receivedAndDeletedBlockList.notifyAll();
+      pendingIncrementalBR.notifyAll();
       while (lastBlockReport == 0) {
         try {
-          receivedAndDeletedBlockList.wait(100);
+          pendingIncrementalBR.wait(100);
         } catch (InterruptedException e) {
           return;
         }
@@ -308,12 +331,12 @@ class BPServiceActor implements Runnable
   
   @VisibleForTesting
   void triggerHeartbeatForTests() throws IOException {
-    synchronized (receivedAndDeletedBlockList) {
+    synchronized (pendingIncrementalBR) {
       lastHeartbeat = 0;
-      receivedAndDeletedBlockList.notifyAll();
+      pendingIncrementalBR.notifyAll();
       while (lastHeartbeat == 0) {
         try {
-          receivedAndDeletedBlockList.wait(100);
+          pendingIncrementalBR.wait(100);
         } catch (InterruptedException e) {
           return;
         }
@@ -323,13 +346,13 @@ class BPServiceActor implements Runnable
 
   @VisibleForTesting
   void triggerDeletionReportForTests() throws IOException {
-    synchronized (receivedAndDeletedBlockList) {
+    synchronized (pendingIncrementalBR) {
       lastDeletedReport = 0;
-      receivedAndDeletedBlockList.notifyAll();
+      pendingIncrementalBR.notifyAll();
 
       while (lastDeletedReport == 0) {
         try {
-          receivedAndDeletedBlockList.wait(100);
+          pendingIncrementalBR.wait(100);
         } catch (InterruptedException e) {
           return;
         }
@@ -527,10 +550,10 @@ class BPServiceActor implements Runnable
         //
         long waitTime = dnConf.heartBeatInterval - 
         (System.currentTimeMillis() - lastHeartbeat);
-        synchronized(receivedAndDeletedBlockList) {
+        synchronized(pendingIncrementalBR) {
           if (waitTime > 0 && pendingReceivedRequests == 0) {
             try {
-              receivedAndDeletedBlockList.wait(waitTime);
+              pendingIncrementalBR.wait(waitTime);
             } catch (InterruptedException ie) {
               LOG.warn("BPOfferService for " + this + " interrupted");
             }