You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2010/11/09 05:25:22 UTC

svn commit: r1032836 - in /hadoop/hdfs/trunk: ./ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/java/org/apache/hadoop/hdfs/util/ src/test/hdfs/org/apache...

Author: hairong
Date: Tue Nov  9 04:25:22 2010
New Revision: 1032836

URL: http://svn.apache.org/viewvc?rev=1032836&view=rev
Log:
HDFS-1457. Provide an option to throttle image transmission between pimary and secondary NameNodes. Contributed by Yifei Lu and Hairong Kuang. 

Added:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java
Removed:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockTransferThrottler.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/hdfs-default.xml
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Nov  9 04:25:22 2010
@@ -39,6 +39,9 @@ Trunk (unreleased changes)
 
     HDFS-903.  Support fsimage validation through MD5 checksum. (hairong)
 
+    HDFS-1457. Provide an option to throttle image transmission between
+    pimary and secondary NameNodes. (Yifei Lu and hairong via hairong)
+
   IMPROVEMENTS
 
     HDFS-1304. Add a new unit test for HftpFileSystem.open(..).  (szetszwo)

Modified: hadoop/hdfs/trunk/src/java/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/hdfs-default.xml?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/hdfs-default.xml (original)
+++ hadoop/hdfs/trunk/src/java/hdfs-default.xml Tue Nov  9 04:25:22 2010
@@ -544,4 +544,14 @@ creations/deletions), or "all".</descrip
   </description>
 </property>
 
+<property>
+  <name>dfs.image.transfer.bandwidthPerSec</name>
+  <value>0</value>
+  <description>
+        Specifies the maximum amount of bandwidth that can be utilized for image
+        transfer in term of the number of bytes per second.
+        A default value of 0 indicates that throttling is disabled. 
+  </description>
+</property>
+
 </configuration>

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Nov  9 04:25:22 2010
@@ -206,6 +206,10 @@ public class DFSConfigKeys extends Commo
   public static final String DFS_IMAGE_COMPRESSION_CODEC_DEFAULT =
                                    "org.apache.hadoop.io.compress.DefaultCodec";
 
+  public static final String DFS_IMAGE_TRANSFER_RATE_KEY =
+                                           "dfs.image.transfer.bandwidthPerSec";
+  public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0;  //no throttling
+
   //Keys with no defaults
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
   public static final String  DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout";

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Nov  9 04:25:22 2010
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
@@ -72,7 +73,7 @@ class BlockReceiver implements java.io.C
   private String mirrorAddr;
   private DataOutputStream mirrorOut;
   private Daemon responder = null;
-  private BlockTransferThrottler throttler;
+  private DataTransferThrottler throttler;
   private FSDataset.BlockWriteStreams streams;
   private String clientName;
   DatanodeInfo srcDataNode = null;
@@ -602,7 +603,7 @@ class BlockReceiver implements java.io.C
       DataOutputStream mirrOut, // output to next datanode
       DataInputStream mirrIn,   // input from next datanode
       DataOutputStream replyOut,  // output to previous datanode
-      String mirrAddr, BlockTransferThrottler throttlerArg,
+      String mirrAddr, DataTransferThrottler throttlerArg,
       int numTargets) throws IOException {
 
       boolean responderClosed = false;

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Nov  9 04:25:22 2010
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.ChecksumExce
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.DataChecksum;
@@ -68,7 +69,7 @@ class BlockSender implements java.io.Clo
   private boolean transferToAllowed = true;
   private boolean blockReadFully; //set when the whole block is read
   private boolean verifyChecksum; //if true, check is verified while reading
-  private BlockTransferThrottler throttler;
+  private DataTransferThrottler throttler;
   private final String clientTraceFmt; // format of client trace log message
 
   /**
@@ -419,7 +420,7 @@ class BlockSender implements java.io.Clo
    * @return total bytes reads, including crc.
    */
   long sendBlock(DataOutputStream out, OutputStream baseStream, 
-                 BlockTransferThrottler throttler) throws IOException {
+                 DataTransferThrottler throttler) throws IOException {
     if( out == null ) {
       throw new IOException( "out stream is null" );
     }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Tue Nov  9 04:25:22 2010
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -102,7 +103,7 @@ class DataBlockScanner implements Runnab
   
   Random random = new Random();
   
-  BlockTransferThrottler throttler = null;
+  DataTransferThrottler throttler = null;
   
   // Reconciles blocks on disk to blocks in memory
   DirectoryScanner dirScanner;
@@ -251,7 +252,7 @@ class DataBlockScanner implements Runnab
     }
     
     synchronized (this) {
-      throttler = new BlockTransferThrottler(200, MAX_SCAN_RATE);
+      throttler = new DataTransferThrottler(200, MAX_SCAN_RATE);
     }
   }
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Tue Nov  9 04:25:22 2010
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.balancer.Balancer;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -64,7 +65,7 @@ class DataXceiverServer implements Runna
    * It limits the number of block moves for balancing and
    * the total amount of bandwidth they can use.
    */
-  static class BlockBalanceThrottler extends BlockTransferThrottler {
+  static class BlockBalanceThrottler extends DataTransferThrottler {
    private int numThreads;
    
    /**Constructor

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java Tue Nov  9 04:25:22 2010
@@ -34,6 +34,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 
@@ -77,13 +78,15 @@ public class GetImageServlet extends Htt
                 String.valueOf(nnImage.getFsImageName().length()));
             // send fsImage
             TransferFsImage.getFileServer(response.getOutputStream(),
-                nnImage.getFsImageName()); 
+                nnImage.getFsImageName(),
+                getThrottler(conf)); 
           } else if (ff.getEdit()) {
             response.setHeader(TransferFsImage.CONTENT_LENGTH,
                 String.valueOf(nnImage.getFsEditName().length()));
             // send edits
             TransferFsImage.getFileServer(response.getOutputStream(),
-                nnImage.getFsEditName());
+                nnImage.getFsEditName(),
+                getThrottler(conf));
           } else if (ff.putImage()) {
             // issue a HTTP get request to download the new fsimage 
             nnImage.validateCheckpointUpload(ff.getToken());
@@ -123,6 +126,22 @@ public class GetImageServlet extends Htt
     }
   }
   
+  /**
+   * Construct a throttler from conf
+   * @param conf configuration
+   * @return a data transfer throttler
+   */
+  private final DataTransferThrottler getThrottler(Configuration conf) {
+    long transferBandwidth = 
+      conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
+                   DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
+    DataTransferThrottler throttler = null;
+    if (transferBandwidth > 0) {
+      throttler = new DataTransferThrottler(transferBandwidth);
+    }
+    return throttler;
+  }
+  
   @SuppressWarnings("deprecation")
   protected boolean isValidRequestor(String remoteUser, Configuration conf)
       throws IOException {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java Tue Nov  9 04:25:22 2010
@@ -27,6 +27,7 @@ import javax.servlet.http.HttpServletReq
 
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -124,7 +125,8 @@ class TransferFsImage implements FSConst
    * A server-side method to respond to a getfile http request
    * Copies the contents of the local file into the output stream.
    */
-  static void getFileServer(OutputStream outstream, File localfile) 
+  static void getFileServer(OutputStream outstream, File localfile,
+      DataTransferThrottler throttler) 
     throws IOException {
     byte buf[] = new byte[BUFFER_SIZE];
     FileInputStream infile = null;
@@ -153,6 +155,9 @@ class TransferFsImage implements FSConst
           break;
         }
         outstream.write(buf, 0, num);
+        if (throttler != null) {
+          throttler.throttle(num);
+        }
       }
     } finally {
       if (infile != null) {

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java?rev=1032836&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java Tue Nov  9 04:25:22 2010
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+/** 
+ * a class to throttle the data transfers.
+ * This class is thread safe. It can be shared by multiple threads.
+ * The parameter bandwidthPerSec specifies the total bandwidth shared by
+ * threads.
+ */
+public class DataTransferThrottler {
+  private long period;          // period over which bw is imposed
+  private long periodExtension; // Max period over which bw accumulates.
+  private long bytesPerPeriod; // total number of bytes can be sent in each period
+  private long curPeriodStart; // current period starting time
+  private long curReserve;     // remaining bytes can be sent in the period
+  private long bytesAlreadyUsed;
+
+  /** Constructor 
+   * @param bandwidthPerSec bandwidth allowed in bytes per second. 
+   */
+  public DataTransferThrottler(long bandwidthPerSec) {
+    this(500, bandwidthPerSec);  // by default throttling period is 500ms 
+  }
+
+  /**
+   * Constructor
+   * @param period in milliseconds. Bandwidth is enforced over this
+   *        period.
+   * @param bandwidthPerSec bandwidth allowed in bytes per second. 
+   */
+  public DataTransferThrottler(long period, long bandwidthPerSec) {
+    this.curPeriodStart = System.currentTimeMillis();
+    this.period = period;
+    this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
+    this.periodExtension = period*3;
+  }
+
+  /**
+   * @return current throttle bandwidth in bytes per second.
+   */
+  public synchronized long getBandwidth() {
+    return bytesPerPeriod*1000/period;
+  }
+  
+  /**
+   * Sets throttle bandwidth. This takes affect latest by the end of current
+   * period.
+   * 
+   * @param bytesPerSecond 
+   */
+  public synchronized void setBandwidth(long bytesPerSecond) {
+    if ( bytesPerSecond <= 0 ) {
+      throw new IllegalArgumentException("" + bytesPerSecond);
+    }
+    bytesPerPeriod = bytesPerSecond*period/1000;
+  }
+  
+  /** Given the numOfBytes sent/received since last time throttle was called,
+   * make the current thread sleep if I/O rate is too fast
+   * compared to the given bandwidth.
+   *
+   * @param numOfBytes
+   *     number of bytes sent/received since last time throttle was called
+   */
+  public synchronized void throttle(long numOfBytes) {
+    if ( numOfBytes <= 0 ) {
+      return;
+    }
+
+    curReserve -= numOfBytes;
+    bytesAlreadyUsed += numOfBytes;
+
+    while (curReserve <= 0) {
+      long now = System.currentTimeMillis();
+      long curPeriodEnd = curPeriodStart + period;
+
+      if ( now < curPeriodEnd ) {
+        // Wait for next period so that curReserve can be increased.
+        try {
+          wait( curPeriodEnd - now );
+        } catch (InterruptedException ignored) {}
+      } else if ( now <  (curPeriodStart + periodExtension)) {
+        curPeriodStart = curPeriodEnd;
+        curReserve += bytesPerPeriod;
+      } else {
+        // discard the prev period. Throttler might not have
+        // been used for a long time.
+        curPeriodStart = now;
+        curReserve = bytesPerPeriod - bytesAlreadyUsed;
+      }
+    }
+
+    bytesAlreadyUsed -= numOfBytes;
+  }
+}

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Tue Nov  9 04:25:22 2010
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
@@ -67,7 +68,7 @@ public class TestBlockReplacement extend
     final long TOTAL_BYTES =6*bandwidthPerSec; 
     long bytesToSend = TOTAL_BYTES; 
     long start = Util.now();
-    BlockTransferThrottler throttler = new BlockTransferThrottler(bandwidthPerSec);
+    DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
     long totalBytes = 0L;
     long bytesSent = 1024*512L; // 0.5MB
     throttler.throttle(bytesSent);