You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2008/10/29 22:51:37 UTC

svn commit: r709023 - in /hadoop/core/branches/branch-0.18: CHANGES.txt src/hdfs/org/apache/hadoop/dfs/Balancer.java src/hdfs/org/apache/hadoop/dfs/DataNode.java src/test/org/apache/hadoop/dfs/TestBlockReplacement.java

Author: hairong
Date: Wed Oct 29 14:51:37 2008
New Revision: 709023

URL: http://svn.apache.org/viewvc?rev=709023&view=rev
Log:
HADOOP-4533. HDFS client of hadoop 0.18.1 and HDFS server 0.18.2 (0.18 branch) not compatible. Contributed by Hairong Kuang.

Modified:
    hadoop/core/branches/branch-0.18/CHANGES.txt
    hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java
    hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java
    hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java

Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=709023&r1=709022&r2=709023&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Wed Oct 29 14:51:37 2008
@@ -46,6 +46,10 @@
  
     HADOOP-4526. fsck failing with NullPointerException. (hairong)
 
+    HADOOP-4533. Reverted the patch to HADOOP-4116 to solve the 0.18.1 and
+    0.18.2 incompatibility problem and fixed the balancing semaphore 
+    contention problem without changing the datanode protocol. (hairong)
+
   NEW FEATURES
 
     HADOOP-2421.  Add jdiff output to documentation, listing all API

Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java?rev=709023&r1=709022&r2=709023&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java (original)
+++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java Wed Oct 29 14:51:37 2008
@@ -174,6 +174,11 @@
     LogFactory.getLog("org.apache.hadoop.dfs.Balancer");
   final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
 
+  /** The maximum number of concurrent blocks moves for
+   * balancing purpose at a datanode
+   */
+  public static final int MAX_NUM_CONCURRENT_MOVES = 5;
+
   private Configuration conf;
 
   private double threshold = 10D;
@@ -302,9 +307,7 @@
       try {
         sock.connect(DataNode.createSocketAddr(
             proxySource.datanode.getName()), FSConstants.READ_TIMEOUT);
-        long bandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
-        sock.setSoTimeout(2*FSConstants.READ_TIMEOUT+
-            (int)(block.getNumBytes()*1500/bandwidth));
+        sock.setKeepAlive(true);
         out = new DataOutputStream( new BufferedOutputStream(
             sock.getOutputStream(), FSConstants.BUFFER_SIZE));
         sendRequest(out);
@@ -319,11 +322,6 @@
               proxySource.getName() +
               " succeeded." );
         }
-      } catch (SocketTimeoutException te) { 
-        LOG.warn("Timeout moving block "+block.getBlockId()+
-            " from " + source.getName() + " to " +
-            target.getName() + " through " +
-            proxySource.getName());
       } catch (IOException e) {
         LOG.warn("Error moving block "+block.getBlockId()+
             " from " + source.getName() + " to " +
@@ -476,8 +474,6 @@
   /* A class that keeps track of a datanode in Balancer */
   private static class BalancerDatanode implements Writable {
     final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
-    final protected static short MAX_NUM_CONCURRENT_MOVES =
-      DataNode.MAX_BALANCING_THREADS;
     protected DatanodeInfo datanode;
     private double utilization;
     protected long maxSizeToMove;

Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java?rev=709023&r1=709022&r2=709023&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java Wed Oct 29 14:51:37 2008
@@ -146,6 +146,45 @@
   private static final int MAX_XCEIVER_COUNT = 256;
   private int maxXceiverCount = MAX_XCEIVER_COUNT;
   
+  /** A manager to make sure that cluster balancing does not
+   * take too much resources.
+   *
+   * It limits the number of block moves for balancing and
+   * the total amount of bandwidth they can use.
+   */
+  private static class BlockBalanceThrottler extends Throttler {
+   private int numThreads;
+
+   /**Constructor
+    *
+    * @param bandwidth Total amount of bandwidth can be used for balancing
+    */
+   private BlockBalanceThrottler(long bandwidth) {
+     super(bandwidth);
+     LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
+   }
+
+   /** Check if the block move can start.
+    *
+    * Return true if the thread quota is not exceeded and
+    * the counter is incremented; False otherwise.
+    */
+   private synchronized boolean acquire() {
+     if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
+       return false;
+     }
+     numThreads++;
+     return true;
+   }
+
+   /** Mark that the move is completed. The thread counter is decremented. */
+   private synchronized void release() {
+     numThreads--;
+   }
+  }
+
+  private BlockBalanceThrottler balancingThrottler;
+
   /**
    * We need an estimate for block size to check if the disk partition has
    * enough space. For now we set it to be the default block size set
@@ -156,12 +195,6 @@
    */
   private long estimateBlockSize;
   
-  // The following three fields are to support balancing
-  final static short MAX_BALANCING_THREADS = 5;
-  private Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
-  long balanceBandwidth;
-  private Throttler balancingThrottler;
-
   // For InterDataNodeProtocol
   Server ipcServer;
   
@@ -307,10 +340,8 @@
     this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
     DataNode.nameNodeAddr = nameNodeAddr;
 
-    //set up parameter for cluster balancing
-    this.balanceBandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
-    LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s");
-    this.balancingThrottler = new Throttler(balanceBandwidth);
+    this.balancingThrottler = new BlockBalanceThrottler(
+      conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024));
 
     //initialize periodic block scanner
     String reason = null;
@@ -1364,13 +1395,18 @@
       DatanodeInfo target = new DatanodeInfo(); // read target
       target.readFields(in);
 
+      if (!balancingThrottler.acquire()) { // not able to start
+        LOG.info("Not able to copy block " + blockId + " to "
+            + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
+        sendResponse(s, (short)OP_STATUS_ERROR, socketWriteTimeout);
+        return;
+      }
+
       Socket targetSock = null;
       short opStatus = OP_STATUS_SUCCESS;
       BlockSender blockSender = null;
       DataOutputStream targetOut = null;
       try {
-        balancingSem.acquireUninterruptibly();
-        
         // check if the block exists or not
         blockSender = new BlockSender(block, 0, -1, false, false, false);
 
@@ -1410,6 +1446,9 @@
             + target.getName() + ": " + StringUtils.stringifyException(ioe));
         throw ioe;
       } finally {
+        // now release the thread resource
+        balancingThrottler.release();
+
         /* send response to the requester */
         try {
           sendResponse(s, opStatus, socketWriteTimeout);
@@ -1420,7 +1459,6 @@
         }
         IOUtils.closeStream(targetOut);
         IOUtils.closeStream(blockSender);
-        balancingSem.release();
       }
     }
 
@@ -1433,12 +1471,17 @@
      * @throws IOException
      */
     private void replaceBlock(DataInputStream in) throws IOException {
-      balancingSem.acquireUninterruptibly();
-
       /* read header */
-      Block block = new Block(in.readLong(), estimateBlockSize, in.readLong()); // block id & len
+      long blockId = in.readLong();
+      Block block = new Block(blockId, estimateBlockSize, in.readLong()); // block id & len
       String sourceID = Text.readString(in);
 
+      if (!balancingThrottler.acquire()) { // not able to start
+        LOG.warn("Not able to receive block " + blockId + " from "
+              + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
+        return;
+      }
+
       short opStatus = OP_STATUS_SUCCESS;
       BlockReceiver blockReceiver = null;
       try {
@@ -1458,6 +1501,8 @@
         opStatus = OP_STATUS_ERROR;
         throw ioe;
       } finally {
+        balancingThrottler.release();
+
         // send response back
         try {
           sendResponse(s, opStatus, socketWriteTimeout);
@@ -1465,7 +1510,6 @@
           LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
         }
         IOUtils.closeStream(blockReceiver);
-        balancingSem.release();
       }
     }
   }

Modified: hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java?rev=709023&r1=709022&r2=709023&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java (original)
+++ hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java Wed Oct 29 14:51:37 2008
@@ -212,7 +212,7 @@
     Socket sock = new Socket();
     sock.connect(NetUtils.createSocketAddr(
         sourceProxy.getName()), FSConstants.READ_TIMEOUT);
-    sock.setSoTimeout(FSConstants.READ_TIMEOUT);
+    sock.setKeepAlive(true);
     // sendRequest
     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
     out.writeShort(FSConstants.DATA_TRANSFER_VERSION);