You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/03/09 01:17:35 UTC
svn commit: r384385 - in
/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs: DFSClient.java
DataNode.java FSNamesystem.java NameNode.java
Author: cutting
Date: Wed Mar 8 16:17:27 2006
New Revision: 384385
URL: http://svn.apache.org/viewcvs?rev=384385&view=rev
Log:
Fix for HADOOP-66. DFS blocks are no longer written to local temp files. If a connection to a datanode fails then an exception is now thrown, rather than trying to re-connect to another datanode. Timeouts were also removed from datanode connections, since these caused a lot of failed connections.
Modified:
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=384385&r1=384384&r2=384385&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Mar 8 16:17:27 2006
@@ -377,7 +377,7 @@
}
try {
s = new Socket(targetAddr.getAddress(), targetAddr.getPort());
- s.setSoTimeout(READ_TIMEOUT);
+ //s.setSoTimeout(READ_TIMEOUT);
//
// Xmit header info to datanode
@@ -528,11 +528,8 @@
private UTF8 src;
boolean closingDown = false;
private boolean overwrite;
- private boolean blockStreamWorking;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
- private File backupFile;
- private OutputStream backupStream;
private Block block;
private DatanodeInfo targets[];
private long filePos = 0;
@@ -546,9 +543,7 @@
this.overwrite = overwrite;
this.blockStream = null;
this.blockReplyStream = null;
- this.blockStreamWorking = false;
- this.backupFile = File.createTempFile("dfsout", "bak");
- this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
+
nextBlockOutputStream(true);
}
@@ -558,12 +553,6 @@
* Must get block ID and the IDs of the destinations from the namenode.
*/
private synchronized void nextBlockOutputStream(boolean firstTime) throws IOException {
- if (! firstTime && blockStreamWorking) {
- blockStream.close();
- blockReplyStream.close();
- blockStreamWorking = false;
- }
-
boolean retry = false;
long start = System.currentTimeMillis();
do {
@@ -602,7 +591,7 @@
Socket s = null;
try {
s = new Socket(target.getAddress(), target.getPort());
- s.setSoTimeout(READ_TIMEOUT);
+ //s.setSoTimeout(READ_TIMEOUT);
} catch (IOException ie) {
// Connection failed. Let's wait a little bit and retry
try {
@@ -636,7 +625,6 @@
bytesWrittenToBlock = 0;
blockStream = out;
blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
- blockStreamWorking = true;
} while (retry);
}
@@ -717,27 +705,21 @@
//
// To the blockStream, write length, then bytes
//
- if (blockStreamWorking) {
- try {
- blockStream.writeLong(workingPos);
- blockStream.write(outBuf, 0, workingPos);
- } catch (IOException ie) {
- try {
- blockStream.close();
- } catch (IOException ie2) {
- }
- try {
- blockReplyStream.close();
- } catch (IOException ie2) {
- }
- namenode.abandonBlock(block, src.toString());
- blockStreamWorking = false;
- }
+ try {
+ blockStream.writeLong(workingPos);
+ blockStream.write(outBuf, 0, workingPos);
+ } catch (IOException ie) {
+ try {
+ blockStream.close();
+ } catch (IOException ie2) {
+ }
+ try {
+ blockReplyStream.close();
+ } catch (IOException ie2) {
+ }
+ namenode.abandonBlock(block, src.toString());
+ throw ie;
}
- //
- // To the local block backup, write just the bytes
- //
- backupStream.write(outBuf, 0, workingPos);
//
// Track position
@@ -752,79 +734,20 @@
* We're done writing to the current block.
*/
private synchronized void endBlock() throws IOException {
- boolean mustRecover = ! blockStreamWorking;
-
- //
- // A zero-length set of data indicates the end of the block
- //
- if (blockStreamWorking) {
- try {
- internalClose();
- } catch (IOException ie) {
- try {
- blockStream.close();
- } catch (IOException ie2) {
- }
- try {
- blockReplyStream.close();
- } catch (IOException ie2) {
- }
- namenode.abandonBlock(block, src.toString());
- mustRecover = true;
- } finally {
- blockStreamWorking = false;
- }
- }
-
- //
- // Done with local copy
- //
- backupStream.close();
-
- //
- // If necessary, recover from a failed datanode connection.
- //
- while (mustRecover) {
- nextBlockOutputStream(false);
- InputStream in = new FileInputStream(backupFile);
- try {
- byte buf[] = new byte[BUFFER_SIZE];
- int bytesRead = in.read(buf);
- while (bytesRead >= 0) {
- blockStream.writeLong((long) bytesRead);
- blockStream.write(buf, 0, bytesRead);
- bytesRead = in.read(buf);
- }
- internalClose();
- LOG.info("Recovered from failed datanode connection");
- mustRecover = false;
- } catch (IOException ie) {
- try {
- blockStream.close();
- } catch (IOException ie2) {
- }
- try {
- blockReplyStream.close();
- } catch (IOException ie2) {
- }
- namenode.abandonBlock(block, src.toString());
- blockStreamWorking = false;
- }
+ try {
+ internalClose();
+ } catch (IOException ie) {
+ namenode.abandonBlock(block, src.toString());
+ throw ie;
}
-
- //
- // Delete local backup, start new one
- //
- backupFile.delete();
- backupFile = File.createTempFile("dfsout", "bak");
- backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
}
/**
- * Close down stream to remote datanode. Called from two places
- * in endBlock();
+ * Close down stream to remote datanode.
*/
private synchronized void internalClose() throws IOException {
+ try {
+ // A zero-length set of data indicates the end of the block
blockStream.writeLong(0);
blockStream.flush();
@@ -838,8 +761,16 @@
lb.readFields(blockReplyStream);
namenode.reportWrittenBlock(lb);
- blockStream.close();
- blockReplyStream.close();
+ } finally {
+ try {
+ blockStream.close();
+ } catch (IOException ie2) {
+ }
+ try {
+ blockReplyStream.close();
+ } catch (IOException ie2) {
+ }
+ }
}
/**
@@ -855,14 +786,9 @@
flush();
endBlock();
- backupStream.close();
- backupFile.delete();
+ blockStream.close();
+ blockReplyStream.close();
- if (blockStreamWorking) {
- blockStream.close();
- blockReplyStream.close();
- blockStreamWorking = false;
- }
super.close();
long localstart = System.currentTimeMillis();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=384385&r1=384384&r2=384385&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Mar 8 16:17:27 2006
@@ -286,7 +286,7 @@
try {
while (shouldListen) {
Socket s = ss.accept();
- s.setSoTimeout(READ_TIMEOUT);
+ //s.setSoTimeout(READ_TIMEOUT);
new Daemon(new DataXceiver(s)).start();
}
ss.close();
@@ -368,10 +368,10 @@
// Connect to backup machine
mirrorTarget = createSocketAddr(targets[1].getName().toString());
try {
- Socket s = new Socket(mirrorTarget.getAddress(), mirrorTarget.getPort());
- s.setSoTimeout(READ_TIMEOUT);
- out2 = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
- in2 = new DataInputStream(new BufferedInputStream(s.getInputStream()));
+ Socket s2 = new Socket(mirrorTarget.getAddress(), mirrorTarget.getPort());
+ //s2.setSoTimeout(READ_TIMEOUT);
+ out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));
+ in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));
// Write connection header
out2.write(OP_WRITE_BLOCK);
@@ -507,6 +507,7 @@
mirrors.add(curTarget);
LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()]));
newLB.write(reply);
+ reply.flush();
} finally {
reply.close();
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=384385&r1=384384&r2=384385&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Mar 8 16:17:27 2006
@@ -1277,15 +1277,14 @@
//
// Build list of machines we can actually choose from
//
- long latestRemaining = 0;
Vector targetList = new Vector();
for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
DatanodeInfo node = (DatanodeInfo) it.next();
if (! forbiddenMachines.contains(node.getHost())) {
targetList.add(node);
- latestRemaining += node.getRemaining();
}
}
+ Collections.shuffle(targetList);
//
// Now pick one
@@ -1309,12 +1308,9 @@
//
// Otherwise, choose node according to target capacity
//
- double target = Math.abs(r.nextDouble()) * latestRemaining;
for (Iterator it = targetList.iterator(); it.hasNext(); ) {
DatanodeInfo node = (DatanodeInfo) it.next();
- target -= node.getRemaining();
- if ((node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) &&
- (target <= 0)) {
+ if ((node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE)) {
return node;
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=384385&r1=384384&r2=384385&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Wed Mar 8 16:17:27 2006
@@ -184,9 +184,7 @@
* The client needs to give up on the block.
*/
public void abandonBlock(Block b, String src) throws IOException {
- if (! namesystem.abandonBlock(b, new UTF8(src))) {
- throw new IOException("Cannot abandon block during write to " + src);
- }
+ namesystem.abandonBlock(b, new UTF8(src));
}
/**
*/