You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/03/21 18:49:33 UTC
svn commit: r639730 - in /hadoop/core/branches/branch-0.16: CHANGES.txt
src/java/org/apache/hadoop/dfs/DataNode.java
Author: rangadi
Date: Fri Mar 21 10:49:26 2008
New Revision: 639730
URL: http://svn.apache.org/viewvc?rev=639730&view=rev
Log:
HADOOP-3007. Tolerate mirror failures while DataNode is replicating blocks as it used to before. (rangadi)
Modified:
hadoop/core/branches/branch-0.16/CHANGES.txt
hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java
Modified: hadoop/core/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/CHANGES.txt?rev=639730&r1=639729&r2=639730&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.16/CHANGES.txt Fri Mar 21 10:49:26 2008
@@ -24,6 +24,9 @@
HADOOP-3042. Updates the Javadoc in JobConf.getOutputPath to reflect
the actual temporary path. (Amareshwari Sriramadasu via ddas)
+ HADOOP-3007. Tolerate mirror failures while DataNode is replicating
+ blocks as it used to before. (rangadi)
+
Release 0.16.1 - 2008-03-13
INCOMPATIBLE CHANGES
Modified: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java?rev=639730&r1=639729&r2=639730&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java Fri Mar 21 10:49:26 2008
@@ -1132,7 +1132,14 @@
mirrorIn = null;
IOUtils.closeSocket(mirrorSock);
mirrorSock = null;
- throw e;
+ if (client.length() > 0) {
+ throw e;
+ } else {
+ LOG.info(dnRegistration + ":Exception transfering block " +
+ block + " to mirror " + mirrorNode +
+ ". continuing without the mirror.\n" +
+ StringUtils.stringifyException(e));
+ }
}
}
@@ -2039,6 +2046,26 @@
}
}
+ /**
+ * While writing to mirrorOut, failure to write to mirror should not
+ * affect this datanode unless a client is writing the block.
+ */
+ private void handleMirrorOutError(IOException ioe) throws IOException {
+ LOG.info(dnRegistration + ":Exception writing block " +
+ block + " to mirror " + mirrorAddr + "\n" +
+ StringUtils.stringifyException(ioe));
+ mirrorOut = null;
+ //
+ // If stream-copy fails, continue
+ // writing to disk for replication requests. For client
+ // writes, return error so that the client can do error
+ // recovery.
+ //
+ if (clientName.length() > 0) {
+ throw ioe;
+ }
+ }
+
/* receive a chunk: write it to disk & mirror it to another stream */
private void receiveChunk( int len ) throws IOException {
if (len <= 0 || len > bytesPerChecksum) {
@@ -2079,19 +2106,7 @@
mirrorOut.writeInt(len);
mirrorOut.write(buf, 0, len + checksumSize);
} catch (IOException ioe) {
- LOG.info(dnRegistration + ":Exception writing block " +
- block + " to mirror " + mirrorAddr + "\n" +
- StringUtils.stringifyException(ioe));
- mirrorOut = null;
- //
- // If stream-copy fails, continue
- // writing to disk for replication requests. For client
- // writes, return error so that the client can do error
- // recovery.
- //
- if (clientName.length() > 0) {
- throw ioe;
- }
+ handleMirrorOutError(ioe);
}
}
@@ -2139,26 +2154,19 @@
mirrorOut.writeLong(seqno);
mirrorOut.writeBoolean(lastPacketInBlock);
} catch (IOException e) {
- LOG.info("Exception writing to mirror " + mirrorAddr + "\n"
- + StringUtils.stringifyException(e));
- mirrorOut = null;
-
- // If stream-copy fails, continue
- // writing to disk for replication requests. For client
- // writes, return error so that the client can do error
- // recovery.
- //
- if (clientName.length() > 0) {
- throw e;
- }
+ handleMirrorOutError(e);
}
}
if (len == 0) {
LOG.info("Receiving empty packet for block " + block);
if (mirrorOut != null) {
- mirrorOut.writeInt(len);
- mirrorOut.flush();
+ try {
+ mirrorOut.writeInt(len);
+ mirrorOut.flush();
+ } catch (IOException e) {
+ handleMirrorOutError(e);
+ }
}
}
@@ -2174,7 +2182,11 @@
}
if (curPacketSize == packetSize) {
if (mirrorOut != null) {
- mirrorOut.flush();
+ try {
+ mirrorOut.flush();
+ } catch (IOException e) {
+ handleMirrorOutError(e);
+ }
}
break;
}
@@ -2198,15 +2210,15 @@
public void receiveBlock(
- DataOutputStream mirrorOut, // output to next datanode
- DataInputStream mirrorIn, // input from next datanode
+ DataOutputStream mirrOut, // output to next datanode
+ DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
- String mirrorAddr, Throttler throttler,
+ String mirrAddr, Throttler throttlerArg,
int numTargets) throws IOException {
- this.mirrorOut = mirrorOut;
- this.mirrorAddr = mirrorAddr;
- this.throttler = throttler;
+ mirrorOut = mirrOut;
+ mirrorAddr = mirrAddr;
+ throttler = throttlerArg;
try {
// write data chunk header
@@ -2216,7 +2228,7 @@
}
if (clientName.length() > 0) {
responder = new Daemon(threadGroup,
- new PacketResponder(this, block, mirrorIn,
+ new PacketResponder(this, block, mirrIn,
replyOut, numTargets,
clientName));
responder.start(); // start thread to processes reponses
@@ -2233,8 +2245,12 @@
// flush the mirror out
if (mirrorOut != null) {
- mirrorOut.writeInt(0); // mark the end of the block
- mirrorOut.flush();
+ try {
+ mirrorOut.writeInt(0); // mark the end of the block
+ mirrorOut.flush();
+ } catch (IOException e) {
+ handleMirrorOutError(e);
+ }
}
// wait for all outstanding packet responses. And then