You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2005/09/09 20:24:03 UTC
svn commit: r279840 - in
/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs:
ClientProtocol.java DataNode.java FSNamesystem.java NDFSClient.java
NameNode.java
Author: mc
Date: Fri Sep 9 11:24:00 2005
New Revision: 279840
URL: http://svn.apache.org/viewcvs?rev=279840&view=rev
Log:
The NDFS Client will now report its own written blocks to
the NameNode; the receiving DataNodes used to do it. This led
to all sorts of bad heartbeat race conditions among DataNodes,
and the NameNode had difficulty deciding when to replicate
partially-reported blocks. No mas!
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java?rev=279840&r1=279839&r2=279840&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java Fri Sep 9 11:24:00 2005
@@ -45,6 +45,13 @@
public LocatedBlock addBlock(String src) throws IOException;
/**
+ * The client wants to report a block it has just successfully
+ * written to one or more datanodes. Client-written blocks are
+ * always reported by the client, not by the datanode.
+ */
+ public void reportWrittenBlock(LocatedBlock b) throws IOException;
+
+ /**
* The client wants to abandon writing to the indicated block,
* part of the indicated (currently-open) filename.
*/
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java?rev=279840&r1=279839&r2=279840&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java Fri Sep 9 11:24:00 2005
@@ -285,6 +285,7 @@
//
DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
try {
+ boolean shouldReportBlock = in.readBoolean();
Block b = new Block();
b.readFields(in);
int numTargets = in.readInt();
@@ -302,11 +303,15 @@
//
// Make sure curTarget is equal to this machine
- // REMIND - mjc
//
DatanodeInfo curTarget = targets[0];
//
+ // Track all the places we've successfully written the block
+ //
+ Vector mirrors = new Vector();
+
+ //
// Open local disk out
//
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b)));
@@ -329,6 +334,7 @@
// Write connection header
out2.write(OP_WRITE_BLOCK);
+ out2.writeBoolean(shouldReportBlock);
b.write(out2);
out2.writeInt(targets.length - 1);
for (int i = 1; i < targets.length; i++) {
@@ -412,6 +418,12 @@
if (complete != WRITE_COMPLETE) {
LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);
}
+ LocatedBlock newLB = new LocatedBlock();
+ newLB.readFields(in2);
+ DatanodeInfo mirrorsSoFar[] = newLB.getLocations();
+ for (int k = 0; k < mirrorsSoFar.length; k++) {
+ mirrors.add(mirrorsSoFar[k]);
+ }
LOG.info("Received block " + b + " from " + s.getInetAddress() + " and mirrored to " + mirrorTarget);
}
} finally {
@@ -432,17 +444,25 @@
//
// Tell the namenode that we've received this block
- // in full.
+ // in full, if we've been asked to. This is done
+ // during NameNode-directed block transfers, but not
+ // client writes.
//
- synchronized (receivedBlockList) {
- receivedBlockList.add(b);
- receivedBlockList.notifyAll();
+ if (shouldReportBlock) {
+ synchronized (receivedBlockList) {
+ receivedBlockList.add(b);
+ receivedBlockList.notifyAll();
+ }
}
//
- // Tell client job is done
+ // Tell client job is done, and reply with
+ // the new LocatedBlock.
//
reply.writeLong(WRITE_COMPLETE);
+ mirrors.add(curTarget);
+ LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()]));
+ newLB.write(reply);
} finally {
reply.close();
}
@@ -582,6 +602,7 @@
// Header info
//
out.write(OP_WRITE_BLOCK);
+ out.writeBoolean(true);
b.write(out);
out.writeInt(targets.length);
for (int i = 0; i < targets.length; i++) {
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=279840&r1=279839&r2=279840&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java Fri Sep 9 11:24:00 2005
@@ -293,16 +293,6 @@
// Create next block
results[0] = allocateBlock(src);
results[1] = targets;
- } else {
- LOG.info("File progress failure for " + src);
- Vector v = (Vector) pendingCreates.get(src);
- for (Iterator it = v.iterator(); it.hasNext(); ) {
- Block b = (Block) it.next();
- TreeSet containingNodes = (TreeSet) blocksMap.get(b);
- if (containingNodes == null || containingNodes.size() < MIN_REPLICATION) {
- LOG.info("Problem with block " + b + ", with " + (containingNodes == null ? "0" : "" + containingNodes.size()) + " nodes reporting in.");
- }
- }
}
}
return results;
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java?rev=279840&r1=279839&r2=279840&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java Fri Sep 9 11:24:00 2005
@@ -623,6 +623,7 @@
//
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
out.write(OP_WRITE_BLOCK);
+ out.writeBoolean(false);
block.write(out);
out.writeInt(nodes.length);
for (int i = 0; i < nodes.length; i++) {
@@ -745,6 +746,7 @@
}
/**
+ * We're done writing to the current block.
*/
private synchronized void endBlock() throws IOException {
boolean mustRecover = ! blockStreamWorking;
@@ -754,16 +756,7 @@
//
if (blockStreamWorking) {
try {
- blockStream.writeLong(0);
- blockStream.flush();
-
- long complete = blockReplyStream.readLong();
- if (complete != WRITE_COMPLETE) {
- LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
- throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
- }
- blockStream.close();
- blockReplyStream.close();
+ internalClose();
} catch (IOException ie) {
try {
blockStream.close();
@@ -799,8 +792,7 @@
blockStream.write(buf, 0, bytesRead);
bytesRead = in.read(buf);
}
- blockStream.writeLong(0);
- blockStream.close();
+ internalClose();
LOG.info("Recovered from failed datanode connection");
mustRecover = false;
} catch (IOException ie) {
@@ -823,6 +815,28 @@
backupFile.delete();
backupFile = File.createTempFile("ndfsout", "bak");
backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
+ }
+
+ /**
+ * Close down stream to remote datanode. Called from two places
+ * in endBlock();
+ */
+ private synchronized void internalClose() throws IOException {
+ blockStream.writeLong(0);
+ blockStream.flush();
+
+ long complete = blockReplyStream.readLong();
+ if (complete != WRITE_COMPLETE) {
+ LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
+ throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
+ }
+
+ LocatedBlock lb = new LocatedBlock();
+ lb.readFields(blockReplyStream);
+ namenode.reportWrittenBlock(lb);
+
+ blockStream.close();
+ blockReplyStream.close();
}
/**
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java?rev=279840&r1=279839&r2=279840&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java Fri Sep 9 11:24:00 2005
@@ -129,6 +129,20 @@
}
/**
+ * The client can report in a set written blocks that it wrote.
+ * These blocks are reported via the client instead of the datanode
+ * to prevent weird heartbeat race conditions.
+ */
+ public void reportWrittenBlock(LocatedBlock lb) throws IOException {
+ Block b = lb.getBlock();
+ DatanodeInfo targets[] = lb.getLocations();
+ for (int i = 0; i < targets.length; i++) {
+ namesystem.blockReceived(b, targets[i].getName());
+ }
+ }
+
+ /**
+ * The client needs to give up on the block.
*/
public void abandonBlock(Block b, String src) throws IOException {
if (! namesystem.abandonBlock(b, new UTF8(src))) {