You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2010/11/09 05:25:22 UTC
svn commit: r1032836 - in /hadoop/hdfs/trunk: ./ src/java/
src/java/org/apache/hadoop/hdfs/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/java/org/apache/hadoop/hdfs/server/namenode/
src/java/org/apache/hadoop/hdfs/util/ src/test/hdfs/org/apache...
Author: hairong
Date: Tue Nov 9 04:25:22 2010
New Revision: 1032836
URL: http://svn.apache.org/viewvc?rev=1032836&view=rev
Log:
HDFS-1457. Provide an option to throttle image transmission between pimary and secondary NameNodes. Contributed by Yifei Lu and Hairong Kuang.
Added:
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java
Removed:
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockTransferThrottler.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/hdfs-default.xml
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Nov 9 04:25:22 2010
@@ -39,6 +39,9 @@ Trunk (unreleased changes)
HDFS-903. Support fsimage validation through MD5 checksum. (hairong)
+ HDFS-1457. Provide an option to throttle image transmission between
+ pimary and secondary NameNodes. (Yifei Lu and hairong via hairong)
+
IMPROVEMENTS
HDFS-1304. Add a new unit test for HftpFileSystem.open(..). (szetszwo)
Modified: hadoop/hdfs/trunk/src/java/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/hdfs-default.xml?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/hdfs-default.xml (original)
+++ hadoop/hdfs/trunk/src/java/hdfs-default.xml Tue Nov 9 04:25:22 2010
@@ -544,4 +544,14 @@ creations/deletions), or "all".</descrip
</description>
</property>
+<property>
+ <name>dfs.image.transfer.bandwidthPerSec</name>
+ <value>0</value>
+ <description>
+ Specifies the maximum amount of bandwidth that can be utilized for image
+ transfer in term of the number of bytes per second.
+ A default value of 0 indicates that throttling is disabled.
+ </description>
+</property>
+
</configuration>
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Nov 9 04:25:22 2010
@@ -206,6 +206,10 @@ public class DFSConfigKeys extends Commo
public static final String DFS_IMAGE_COMPRESSION_CODEC_DEFAULT =
"org.apache.hadoop.io.compress.DefaultCodec";
+ public static final String DFS_IMAGE_TRANSFER_RATE_KEY =
+ "dfs.image.transfer.bandwidthPerSec";
+ public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0; //no throttling
+
//Keys with no defaults
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout";
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Nov 9 04:25:22 2010
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
@@ -72,7 +73,7 @@ class BlockReceiver implements java.io.C
private String mirrorAddr;
private DataOutputStream mirrorOut;
private Daemon responder = null;
- private BlockTransferThrottler throttler;
+ private DataTransferThrottler throttler;
private FSDataset.BlockWriteStreams streams;
private String clientName;
DatanodeInfo srcDataNode = null;
@@ -602,7 +603,7 @@ class BlockReceiver implements java.io.C
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
- String mirrAddr, BlockTransferThrottler throttlerArg,
+ String mirrAddr, DataTransferThrottler throttlerArg,
int numTargets) throws IOException {
boolean responderClosed = false;
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Nov 9 04:25:22 2010
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.ChecksumExce
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
@@ -68,7 +69,7 @@ class BlockSender implements java.io.Clo
private boolean transferToAllowed = true;
private boolean blockReadFully; //set when the whole block is read
private boolean verifyChecksum; //if true, check is verified while reading
- private BlockTransferThrottler throttler;
+ private DataTransferThrottler throttler;
private final String clientTraceFmt; // format of client trace log message
/**
@@ -419,7 +420,7 @@ class BlockSender implements java.io.Clo
* @return total bytes reads, including crc.
*/
long sendBlock(DataOutputStream out, OutputStream baseStream,
- BlockTransferThrottler throttler) throws IOException {
+ DataTransferThrottler throttler) throws IOException {
if( out == null ) {
throw new IOException( "out stream is null" );
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Tue Nov 9 04:25:22 2010
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
@@ -102,7 +103,7 @@ class DataBlockScanner implements Runnab
Random random = new Random();
- BlockTransferThrottler throttler = null;
+ DataTransferThrottler throttler = null;
// Reconciles blocks on disk to blocks in memory
DirectoryScanner dirScanner;
@@ -251,7 +252,7 @@ class DataBlockScanner implements Runnab
}
synchronized (this) {
- throttler = new BlockTransferThrottler(200, MAX_SCAN_RATE);
+ throttler = new DataTransferThrottler(200, MAX_SCAN_RATE);
}
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Tue Nov 9 04:25:22 2010
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -64,7 +65,7 @@ class DataXceiverServer implements Runna
* It limits the number of block moves for balancing and
* the total amount of bandwidth they can use.
*/
- static class BlockBalanceThrottler extends BlockTransferThrottler {
+ static class BlockBalanceThrottler extends DataTransferThrottler {
private int numThreads;
/**Constructor
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java Tue Nov 9 04:25:22 2010
@@ -34,6 +34,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
@@ -77,13 +78,15 @@ public class GetImageServlet extends Htt
String.valueOf(nnImage.getFsImageName().length()));
// send fsImage
TransferFsImage.getFileServer(response.getOutputStream(),
- nnImage.getFsImageName());
+ nnImage.getFsImageName(),
+ getThrottler(conf));
} else if (ff.getEdit()) {
response.setHeader(TransferFsImage.CONTENT_LENGTH,
String.valueOf(nnImage.getFsEditName().length()));
// send edits
TransferFsImage.getFileServer(response.getOutputStream(),
- nnImage.getFsEditName());
+ nnImage.getFsEditName(),
+ getThrottler(conf));
} else if (ff.putImage()) {
// issue a HTTP get request to download the new fsimage
nnImage.validateCheckpointUpload(ff.getToken());
@@ -123,6 +126,22 @@ public class GetImageServlet extends Htt
}
}
+ /**
+ * Construct a throttler from conf
+ * @param conf configuration
+ * @return a data transfer throttler
+ */
+ private final DataTransferThrottler getThrottler(Configuration conf) {
+ long transferBandwidth =
+ conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
+ DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
+ DataTransferThrottler throttler = null;
+ if (transferBandwidth > 0) {
+ throttler = new DataTransferThrottler(transferBandwidth);
+ }
+ return throttler;
+ }
+
@SuppressWarnings("deprecation")
protected boolean isValidRequestor(String remoteUser, Configuration conf)
throws IOException {
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java Tue Nov 9 04:25:22 2010
@@ -27,6 +27,7 @@ import javax.servlet.http.HttpServletReq
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
@@ -124,7 +125,8 @@ class TransferFsImage implements FSConst
* A server-side method to respond to a getfile http request
* Copies the contents of the local file into the output stream.
*/
- static void getFileServer(OutputStream outstream, File localfile)
+ static void getFileServer(OutputStream outstream, File localfile,
+ DataTransferThrottler throttler)
throws IOException {
byte buf[] = new byte[BUFFER_SIZE];
FileInputStream infile = null;
@@ -153,6 +155,9 @@ class TransferFsImage implements FSConst
break;
}
outstream.write(buf, 0, num);
+ if (throttler != null) {
+ throttler.throttle(num);
+ }
}
} finally {
if (infile != null) {
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java?rev=1032836&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java Tue Nov 9 04:25:22 2010
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+/**
+ * a class to throttle the data transfers.
+ * This class is thread safe. It can be shared by multiple threads.
+ * The parameter bandwidthPerSec specifies the total bandwidth shared by
+ * threads.
+ */
+public class DataTransferThrottler {
+ private long period; // period over which bw is imposed
+ private long periodExtension; // Max period over which bw accumulates.
+ private long bytesPerPeriod; // total number of bytes can be sent in each period
+ private long curPeriodStart; // current period starting time
+ private long curReserve; // remaining bytes can be sent in the period
+ private long bytesAlreadyUsed;
+
+ /** Constructor
+ * @param bandwidthPerSec bandwidth allowed in bytes per second.
+ */
+ public DataTransferThrottler(long bandwidthPerSec) {
+ this(500, bandwidthPerSec); // by default throttling period is 500ms
+ }
+
+ /**
+ * Constructor
+ * @param period in milliseconds. Bandwidth is enforced over this
+ * period.
+ * @param bandwidthPerSec bandwidth allowed in bytes per second.
+ */
+ public DataTransferThrottler(long period, long bandwidthPerSec) {
+ this.curPeriodStart = System.currentTimeMillis();
+ this.period = period;
+ this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
+ this.periodExtension = period*3;
+ }
+
+ /**
+ * @return current throttle bandwidth in bytes per second.
+ */
+ public synchronized long getBandwidth() {
+ return bytesPerPeriod*1000/period;
+ }
+
+ /**
+ * Sets throttle bandwidth. This takes affect latest by the end of current
+ * period.
+ *
+ * @param bytesPerSecond
+ */
+ public synchronized void setBandwidth(long bytesPerSecond) {
+ if ( bytesPerSecond <= 0 ) {
+ throw new IllegalArgumentException("" + bytesPerSecond);
+ }
+ bytesPerPeriod = bytesPerSecond*period/1000;
+ }
+
+ /** Given the numOfBytes sent/received since last time throttle was called,
+ * make the current thread sleep if I/O rate is too fast
+ * compared to the given bandwidth.
+ *
+ * @param numOfBytes
+ * number of bytes sent/received since last time throttle was called
+ */
+ public synchronized void throttle(long numOfBytes) {
+ if ( numOfBytes <= 0 ) {
+ return;
+ }
+
+ curReserve -= numOfBytes;
+ bytesAlreadyUsed += numOfBytes;
+
+ while (curReserve <= 0) {
+ long now = System.currentTimeMillis();
+ long curPeriodEnd = curPeriodStart + period;
+
+ if ( now < curPeriodEnd ) {
+ // Wait for next period so that curReserve can be increased.
+ try {
+ wait( curPeriodEnd - now );
+ } catch (InterruptedException ignored) {}
+ } else if ( now < (curPeriodStart + periodExtension)) {
+ curPeriodStart = curPeriodEnd;
+ curReserve += bytesPerPeriod;
+ } else {
+ // discard the prev period. Throttler might not have
+ // been used for a long time.
+ curPeriodStart = now;
+ curReserve = bytesPerPeriod - bytesAlreadyUsed;
+ }
+ }
+
+ bytesAlreadyUsed -= numOfBytes;
+ }
+}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1032836&r1=1032835&r2=1032836&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Tue Nov 9 04:25:22 2010
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.F
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
@@ -67,7 +68,7 @@ public class TestBlockReplacement extend
final long TOTAL_BYTES =6*bandwidthPerSec;
long bytesToSend = TOTAL_BYTES;
long start = Util.now();
- BlockTransferThrottler throttler = new BlockTransferThrottler(bandwidthPerSec);
+ DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
long totalBytes = 0L;
long bytesSent = 1024*512L; // 0.5MB
throttler.throttle(bytesSent);