You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2008/10/29 22:51:37 UTC
svn commit: r709023 - in /hadoop/core/branches/branch-0.18: CHANGES.txt
src/hdfs/org/apache/hadoop/dfs/Balancer.java
src/hdfs/org/apache/hadoop/dfs/DataNode.java
src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
Author: hairong
Date: Wed Oct 29 14:51:37 2008
New Revision: 709023
URL: http://svn.apache.org/viewvc?rev=709023&view=rev
Log:
HADOOP-4533. HDFS client of hadoop 0.18.1 and HDFS server 0.18.2 (0.18 branch) not compatible. Contributed by Hairong Kuang.
Modified:
hadoop/core/branches/branch-0.18/CHANGES.txt
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=709023&r1=709022&r2=709023&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Wed Oct 29 14:51:37 2008
@@ -46,6 +46,10 @@
HADOOP-4526. fsck failing with NullPointerException. (hairong)
+ HADOOP-4533. Reverted the patch to HADOOP-4116 to solve the 0.18.1 and
+ 0.18.2 incompatibility problem and fixed the balancing semaphore
+ contention problem without changing the datanode protocol. (hairong)
+
NEW FEATURES
HADOOP-2421. Add jdiff output to documentation, listing all API
Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java?rev=709023&r1=709022&r2=709023&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java (original)
+++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java Wed Oct 29 14:51:37 2008
@@ -174,6 +174,11 @@
LogFactory.getLog("org.apache.hadoop.dfs.Balancer");
final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
+ /** The maximum number of concurrent blocks moves for
+ * balancing purpose at a datanode
+ */
+ public static final int MAX_NUM_CONCURRENT_MOVES = 5;
+
private Configuration conf;
private double threshold = 10D;
@@ -302,9 +307,7 @@
try {
sock.connect(DataNode.createSocketAddr(
proxySource.datanode.getName()), FSConstants.READ_TIMEOUT);
- long bandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
- sock.setSoTimeout(2*FSConstants.READ_TIMEOUT+
- (int)(block.getNumBytes()*1500/bandwidth));
+ sock.setKeepAlive(true);
out = new DataOutputStream( new BufferedOutputStream(
sock.getOutputStream(), FSConstants.BUFFER_SIZE));
sendRequest(out);
@@ -319,11 +322,6 @@
proxySource.getName() +
" succeeded." );
}
- } catch (SocketTimeoutException te) {
- LOG.warn("Timeout moving block "+block.getBlockId()+
- " from " + source.getName() + " to " +
- target.getName() + " through " +
- proxySource.getName());
} catch (IOException e) {
LOG.warn("Error moving block "+block.getBlockId()+
" from " + source.getName() + " to " +
@@ -476,8 +474,6 @@
/* A class that keeps track of a datanode in Balancer */
private static class BalancerDatanode implements Writable {
final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
- final protected static short MAX_NUM_CONCURRENT_MOVES =
- DataNode.MAX_BALANCING_THREADS;
protected DatanodeInfo datanode;
private double utilization;
protected long maxSizeToMove;
Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java?rev=709023&r1=709022&r2=709023&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java Wed Oct 29 14:51:37 2008
@@ -146,6 +146,45 @@
private static final int MAX_XCEIVER_COUNT = 256;
private int maxXceiverCount = MAX_XCEIVER_COUNT;
+ /** A manager to make sure that cluster balancing does not
+ * take too much resources.
+ *
+ * It limits the number of block moves for balancing and
+ * the total amount of bandwidth they can use.
+ */
+ private static class BlockBalanceThrottler extends Throttler {
+ private int numThreads;
+
+ /**Constructor
+ *
+ * @param bandwidth Total amount of bandwidth can be used for balancing
+ */
+ private BlockBalanceThrottler(long bandwidth) {
+ super(bandwidth);
+ LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
+ }
+
+ /** Check if the block move can start.
+ *
+ * Return true if the thread quota is not exceeded and
+ * the counter is incremented; False otherwise.
+ */
+ private synchronized boolean acquire() {
+ if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
+ return false;
+ }
+ numThreads++;
+ return true;
+ }
+
+ /** Mark that the move is completed. The thread counter is decremented. */
+ private synchronized void release() {
+ numThreads--;
+ }
+ }
+
+ private BlockBalanceThrottler balancingThrottler;
+
/**
* We need an estimate for block size to check if the disk partition has
* enough space. For now we set it to be the default block size set
@@ -156,12 +195,6 @@
*/
private long estimateBlockSize;
- // The following three fields are to support balancing
- final static short MAX_BALANCING_THREADS = 5;
- private Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
- long balanceBandwidth;
- private Throttler balancingThrottler;
-
// For InterDataNodeProtocol
Server ipcServer;
@@ -307,10 +340,8 @@
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
DataNode.nameNodeAddr = nameNodeAddr;
- //set up parameter for cluster balancing
- this.balanceBandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
- LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s");
- this.balancingThrottler = new Throttler(balanceBandwidth);
+ this.balancingThrottler = new BlockBalanceThrottler(
+ conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024));
//initialize periodic block scanner
String reason = null;
@@ -1364,13 +1395,18 @@
DatanodeInfo target = new DatanodeInfo(); // read target
target.readFields(in);
+ if (!balancingThrottler.acquire()) { // not able to start
+ LOG.info("Not able to copy block " + blockId + " to "
+ + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
+ sendResponse(s, (short)OP_STATUS_ERROR, socketWriteTimeout);
+ return;
+ }
+
Socket targetSock = null;
short opStatus = OP_STATUS_SUCCESS;
BlockSender blockSender = null;
DataOutputStream targetOut = null;
try {
- balancingSem.acquireUninterruptibly();
-
// check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, false);
@@ -1410,6 +1446,9 @@
+ target.getName() + ": " + StringUtils.stringifyException(ioe));
throw ioe;
} finally {
+ // now release the thread resource
+ balancingThrottler.release();
+
/* send response to the requester */
try {
sendResponse(s, opStatus, socketWriteTimeout);
@@ -1420,7 +1459,6 @@
}
IOUtils.closeStream(targetOut);
IOUtils.closeStream(blockSender);
- balancingSem.release();
}
}
@@ -1433,12 +1471,17 @@
* @throws IOException
*/
private void replaceBlock(DataInputStream in) throws IOException {
- balancingSem.acquireUninterruptibly();
-
/* read header */
- Block block = new Block(in.readLong(), estimateBlockSize, in.readLong()); // block id & len
+ long blockId = in.readLong();
+ Block block = new Block(blockId, estimateBlockSize, in.readLong()); // block id & len
String sourceID = Text.readString(in);
+ if (!balancingThrottler.acquire()) { // not able to start
+ LOG.warn("Not able to receive block " + blockId + " from "
+ + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
+ return;
+ }
+
short opStatus = OP_STATUS_SUCCESS;
BlockReceiver blockReceiver = null;
try {
@@ -1458,6 +1501,8 @@
opStatus = OP_STATUS_ERROR;
throw ioe;
} finally {
+ balancingThrottler.release();
+
// send response back
try {
sendResponse(s, opStatus, socketWriteTimeout);
@@ -1465,7 +1510,6 @@
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
}
IOUtils.closeStream(blockReceiver);
- balancingSem.release();
}
}
}
Modified: hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java?rev=709023&r1=709022&r2=709023&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java (original)
+++ hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java Wed Oct 29 14:51:37 2008
@@ -212,7 +212,7 @@
Socket sock = new Socket();
sock.connect(NetUtils.createSocketAddr(
sourceProxy.getName()), FSConstants.READ_TIMEOUT);
- sock.setSoTimeout(FSConstants.READ_TIMEOUT);
+ sock.setKeepAlive(true);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
out.writeShort(FSConstants.DATA_TRANSFER_VERSION);