You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2005/09/09 20:24:03 UTC

svn commit: r279840 - in /lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs: ClientProtocol.java DataNode.java FSNamesystem.java NDFSClient.java NameNode.java

Author: mc
Date: Fri Sep  9 11:24:00 2005
New Revision: 279840

URL: http://svn.apache.org/viewcvs?rev=279840&view=rev
Log:

  The NDFS Client will now report its own written blocks to
the NameNode; the receiving DataNodes used to do it.  This led
to all sorts of bad heartbeat race conditions among DataNodes,
and the NameNode had difficulty deciding when to replicate
partially-reported blocks.  No mas!


Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java?rev=279840&r1=279839&r2=279840&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java Fri Sep  9 11:24:00 2005
@@ -45,6 +45,13 @@
     public LocatedBlock addBlock(String src) throws IOException;
 
     /**
+     * The client wants to report a block it has just successfully
+     * written to one or more datanodes.  Client-written blocks are
+     * always reported by the client, not by the datanode.
+     */
+    public void reportWrittenBlock(LocatedBlock b) throws IOException;
+
+    /**
      * The client wants to abandon writing to the indicated block,
      * part of the indicated (currently-open) filename.
      */

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java?rev=279840&r1=279839&r2=279840&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java Fri Sep  9 11:24:00 2005
@@ -285,6 +285,7 @@
                         //
                         DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
                         try {
+                            boolean shouldReportBlock = in.readBoolean();
                             Block b = new Block();
                             b.readFields(in);
                             int numTargets = in.readInt();
@@ -302,11 +303,15 @@
 
                             //
                             // Make sure curTarget is equal to this machine
-                            // REMIND - mjc
                             //
                             DatanodeInfo curTarget = targets[0];
 
                             //
+                            // Track all the places we've successfully written the block
+                            //
+                            Vector mirrors = new Vector();
+
+                            //
                             // Open local disk out
                             //
                             DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b)));
@@ -329,6 +334,7 @@
 
                                         // Write connection header
                                         out2.write(OP_WRITE_BLOCK);
+                                        out2.writeBoolean(shouldReportBlock);
                                         b.write(out2);
                                         out2.writeInt(targets.length - 1);
                                         for (int i = 1; i < targets.length; i++) {
@@ -412,6 +418,12 @@
                                         if (complete != WRITE_COMPLETE) {
                                             LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);
                                         }
+                                        LocatedBlock newLB = new LocatedBlock();
+                                        newLB.readFields(in2);
+                                        DatanodeInfo mirrorsSoFar[] = newLB.getLocations();
+                                        for (int k = 0; k < mirrorsSoFar.length; k++) {
+                                            mirrors.add(mirrorsSoFar[k]);
+                                        }
                                         LOG.info("Received block " + b + " from " + s.getInetAddress() + " and mirrored to " + mirrorTarget);
                                     }
                                 } finally {
@@ -432,17 +444,25 @@
 
                             // 
                             // Tell the namenode that we've received this block 
-                            // in full.
+                            // in full, if we've been asked to.  This is done
+                            // during NameNode-directed block transfers, but not
+                            // client writes.
                             //
-                            synchronized (receivedBlockList) {
-                                receivedBlockList.add(b);
-                                receivedBlockList.notifyAll();
+                            if (shouldReportBlock) {
+                                synchronized (receivedBlockList) {
+                                    receivedBlockList.add(b);
+                                    receivedBlockList.notifyAll();
+                                }
                             }
 
                             //
-                            // Tell client job is done
+                            // Tell client job is done, and reply with
+                            // the new LocatedBlock.
                             //
                             reply.writeLong(WRITE_COMPLETE);
+                            mirrors.add(curTarget);
+                            LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()]));
+                            newLB.write(reply);
                         } finally {
                             reply.close();
                         }
@@ -582,6 +602,7 @@
                         // Header info
                         //
                         out.write(OP_WRITE_BLOCK);
+                        out.writeBoolean(true);
                         b.write(out);
                         out.writeInt(targets.length);
                         for (int i = 0; i < targets.length; i++) {

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=279840&r1=279839&r2=279840&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java Fri Sep  9 11:24:00 2005
@@ -293,16 +293,6 @@
                 // Create next block
                 results[0] = allocateBlock(src);
                 results[1] = targets;
-            } else {
-                LOG.info("File progress failure for " + src);
-                Vector v = (Vector) pendingCreates.get(src);
-                for (Iterator it = v.iterator(); it.hasNext(); ) {
-                    Block b = (Block) it.next();
-                    TreeSet containingNodes = (TreeSet) blocksMap.get(b);
-                    if (containingNodes == null || containingNodes.size() < MIN_REPLICATION) {
-                        LOG.info("Problem with block " + b + ", with " + (containingNodes == null ? "0" : "" + containingNodes.size()) + " nodes reporting in.");
-                    }
-                }
             }
         }
         return results;

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java?rev=279840&r1=279839&r2=279840&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java Fri Sep  9 11:24:00 2005
@@ -623,6 +623,7 @@
                 //
                 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
                 out.write(OP_WRITE_BLOCK);
+                out.writeBoolean(false);
                 block.write(out);
                 out.writeInt(nodes.length);
                 for (int i = 0; i < nodes.length; i++) {
@@ -745,6 +746,7 @@
         }
 
         /**
+         * We're done writing to the current block.
          */
         private synchronized void endBlock() throws IOException {
             boolean mustRecover = ! blockStreamWorking;
@@ -754,16 +756,7 @@
             //
             if (blockStreamWorking) {
                 try {
-                    blockStream.writeLong(0);
-                    blockStream.flush();
-
-                    long complete = blockReplyStream.readLong();
-                    if (complete != WRITE_COMPLETE) {
-                        LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
-                        throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
-                    }
-                    blockStream.close();
-                    blockReplyStream.close();
+                    internalClose();
                 } catch (IOException ie) {
                     try {
                         blockStream.close();
@@ -799,8 +792,7 @@
                         blockStream.write(buf, 0, bytesRead);
                         bytesRead = in.read(buf);
                     }
-                    blockStream.writeLong(0);
-                    blockStream.close();
+                    internalClose();
                     LOG.info("Recovered from failed datanode connection");
                     mustRecover = false;
                 } catch (IOException ie) {
@@ -823,6 +815,28 @@
             backupFile.delete();
             backupFile = File.createTempFile("ndfsout", "bak");
             backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
+        }
+
+        /**
+         * Close down stream to remote datanode.  Called from two places
+         * in endBlock();
+         */
+        private synchronized void internalClose() throws IOException {
+            blockStream.writeLong(0);
+            blockStream.flush();
+
+            long complete = blockReplyStream.readLong();
+            if (complete != WRITE_COMPLETE) {
+                LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
+                throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
+            }
+                    
+            LocatedBlock lb = new LocatedBlock();
+            lb.readFields(blockReplyStream);
+            namenode.reportWrittenBlock(lb);
+
+            blockStream.close();
+            blockReplyStream.close();
         }
 
         /**

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java?rev=279840&r1=279839&r2=279840&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java Fri Sep  9 11:24:00 2005
@@ -129,6 +129,20 @@
     }
 
     /**
+     * The client can report in a set written blocks that it wrote.
+     * These blocks are reported via the client instead of the datanode
+     * to prevent weird heartbeat race conditions.
+     */
+    public void reportWrittenBlock(LocatedBlock lb) throws IOException {
+        Block b = lb.getBlock();
+        DatanodeInfo targets[] = lb.getLocations();
+        for (int i = 0; i < targets.length; i++) {
+            namesystem.blockReceived(b, targets[i].getName());
+        }
+    }
+
+    /**
+     * The client needs to give up on the block.
      */
     public void abandonBlock(Block b, String src) throws IOException {
         if (! namesystem.abandonBlock(b, new UTF8(src))) {