You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2005/07/31 21:54:18 UTC

svn commit: r226687 - in /lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs: ClientProtocol.java FSNamesystem.java NDFSClient.java NameNode.java

Author: mc
Date: Sun Jul 31 12:54:15 2005
New Revision: 226687

URL: http://svn.apache.org/viewcvs?rev=226687&view=rev
Log:

  Fix two problems:

  1) Revamp the NDFSClientInputStream to be less complicated.
     We can thus easily add a read(byte b[]) method.

  2) Add a NameNode.abandonFileInProgress() method.  The client
     calls this if there's a problem during file-create.  This
     will eliminate the "pendingCreates non-null" exception that
     is sometimes seen. 

     (The problem is when the client cannot connect to a datanode
      for the first block in the file.  We should abandon the entire
      file, not just the block as we were doing previously.)


Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java?rev=226687&r1=226686&r2=226687&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java Sun Jul 31 12:54:15 2005
@@ -51,6 +51,12 @@
     public void abandonBlock(Block b, String src) throws IOException;
 
     /**
+     * The client wants to abandon writing to the current file, and
+     * let anyone else grab it.
+     */
+    public void abandonFileInProgress(String src) throws IOException;
+
+    /**
      * The client is done writing data to the given filename, and would 
      * like to complete it.  Returns whether the file has been closed
      * correctly (true) or whether caller should try again (false).

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=226687&r1=226686&r2=226687&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java Sun Jul 31 12:54:15 2005
@@ -317,6 +317,13 @@
     }
 
     /**
+     * Abandon the entire file in progress
+     */
+    public synchronized void abandonFileInProgress(UTF8 src) throws IOException {
+        internalReleaseCreate(src);
+    }
+
+    /**
      * Finalize the created file and make it world-accessible.  The
      * FSNamesystem will already know the blocks that make up the file.
      * Before we return, we make sure that all the file's blocks have 
@@ -578,7 +585,10 @@
                 internalReleaseLock(src, holder);
             }
             locks.clear();
-            internalReleaseCreates(creates);
+            for (Iterator it = creates.iterator(); it.hasNext(); ) {
+                UTF8 src = (UTF8) it.next();
+                internalReleaseCreate(src);
+            }
             creates.clear();
         }
 
@@ -682,14 +692,11 @@
     private int internalReleaseLock(UTF8 src, UTF8 holder) {
         return dir.releaseLock(src, holder);
     }
-    private void internalReleaseCreates(TreeSet creates) {
-        for (Iterator it = creates.iterator(); it.hasNext(); ) {
-            UTF8 src = (UTF8) it.next();
-            Vector v = (Vector) pendingCreates.remove(src);
-            for (Iterator it2 = v.iterator(); it2.hasNext(); ) {
-                Block b = (Block) it2.next();
-                pendingCreateBlocks.remove(b);
-            }
+    private void internalReleaseCreate(UTF8 src) {
+        Vector v = (Vector) pendingCreates.remove(src);
+        for (Iterator it2 = v.iterator(); it2.hasNext(); ) {
+            Block b = (Block) it2.next();
+            pendingCreateBlocks.remove(b);
         }
     }
 

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java?rev=226687&r1=226686&r2=226687&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java Sun Jul 31 12:54:15 2005
@@ -236,16 +236,10 @@
         private DataInputStream blockStream;
         private DataOutputStream partnerStream;
         private Block blocks[];
-        private int curBlock = 0;
         private DatanodeInfo nodes[][];
         private long pos = 0;
-        private long bytesRemainingInBlock = 0, curBlockSize = 0;
-
-        private int memoryBuf[] = new int[32 * 1024];
-        private long memoryStartPos = 0;
-        private long openPoint = 0;
-        private int memoryBytes = 0;
-        private int memoryBytesStart = 0;
+        private long filelen = 0;
+        private long blockEnd = -1;
 
         /**
          */
@@ -254,23 +248,19 @@
             this.nodes = nodes;
             this.blockStream = null;
             this.partnerStream = null;
+            for (int i = 0; i < blocks.length; i++) {
+                this.filelen += blocks[i].getNumBytes();
+            }
         }
 
         /**
-         * Open a DataInputStream to a DataNode so that it can be written to.
-         * This happens when a file is created and each time a new block is allocated.
-         * Must get block ID and the IDs of the destinations from the namenode.
+         * Open a DataInputStream to a DataNode so that it can be read from.
+         * We get block ID and the IDs of the destinations at startup, from the namenode.
          */
-        private synchronized void nextBlockInputStream() throws IOException {
-            nextBlockInputStream(0);
-        }
-        private synchronized void nextBlockInputStream(long preSkip) throws IOException {
-            if (curBlock >= blocks.length) {
+        private synchronized void blockSeekTo(long target) throws IOException {
+            if (target >= filelen) {
                 throw new IOException("Attempted to read past end of file");
             }
-            if (bytesRemainingInBlock > 0) {
-                throw new IOException("Trying to skip to next block without reading all data");
-            }
 
             if (blockStream != null) {
                 blockStream.close();
@@ -278,17 +268,39 @@
             }
 
             //
-            // Connect to best DataNode for current Block
+            // Compute desired block
+            //
+            int targetBlock = -1;
+            long targetBlockStart = 0;
+            long targetBlockEnd = 0;
+            for (int i = 0; i < blocks.length; i++) {
+                long blocklen = blocks[i].getNumBytes();
+                targetBlockEnd = targetBlockStart + blocklen - 1;
+
+                if (target >= targetBlockStart && target <= targetBlockEnd) {
+                    targetBlock = i;
+                    break;
+                } else {
+                    targetBlockStart = targetBlockEnd + 1;                    
+                }
+            }
+            if (targetBlock < 0) {
+                throw new IOException("Impossible situation: could not find target position " + target);
+            }
+            long offsetIntoBlock = target - targetBlockStart;
+
+            //
+            // Connect to best DataNode for desired Block, with potential offset
             //
-            InetSocketAddress target = null;
+            InetSocketAddress targetAddr = null;
             Socket s = null;
             TreeSet deadNodes = new TreeSet();
             while (s == null) {
                 DatanodeInfo chosenNode;
 
                 try {
-                    chosenNode = bestNode(nodes[curBlock], deadNodes);
-                    target = DataNode.createSocketAddr(chosenNode.getName().toString());
+                    chosenNode = bestNode(nodes[targetBlock], deadNodes);
+                    targetAddr = DataNode.createSocketAddr(chosenNode.getName().toString());
                 } catch (IOException ie) {
                     LOG.info("Could not obtain block from any node.  Retrying...");
                     try {
@@ -299,37 +311,34 @@
                     continue;
                 }
                 try {
-                    s = new Socket(target.getAddress(), target.getPort());
-                    //LOG.info("Now downloading from " + target + ", block " + blocks[curBlock] + ", skipahead " + preSkip);
+                    s = new Socket(targetAddr.getAddress(), targetAddr.getPort());
 
                     //
                     // Xmit header info to datanode
                     //
                     DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
                     out.write(OP_READSKIP_BLOCK);
-                    blocks[curBlock].write(out);
-                    out.writeLong(preSkip);
+                    blocks[targetBlock].write(out);
+                    out.writeLong(offsetIntoBlock);
                     out.flush();
 
                     //
                     // Get bytes in block, set streams
                     //
                     DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
-                    curBlockSize = in.readLong();
+                    long curBlockSize = in.readLong();
                     long amtSkipped = in.readLong();
-
-                    pos += amtSkipped;
-                    bytesRemainingInBlock = curBlockSize - amtSkipped;
-
-                    if (amtSkipped > 0) {
-                        memoryStartPos = pos;
-                        memoryBytes = 0;
-                        memoryBytesStart = 0;
+                    if (curBlockSize != blocks[targetBlock].len) {
+                        throw new IOException("Recorded block size is " + blocks[targetBlock].len + ", but datanode reports size of " + curBlockSize);
+                    }
+                    if (amtSkipped != offsetIntoBlock) {
+                        throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped);
                     }
-                    blockStream = in;
-                    partnerStream = out;
-                    curBlock++;
-                    openPoint = pos;
+
+                    this.pos = target;
+                    this.blockEnd = targetBlockEnd;
+                    this.blockStream = in;
+                    this.partnerStream = out;
                 } catch (IOException ex) {
                     // Put chosen node into dead list, continue
                     LOG.info("Could not connect to " + target);
@@ -340,242 +349,110 @@
         }
 
         /**
+         * Close it down!
          */
-        public synchronized void seek(long pos) throws IOException {
-            if (pos < 0) {
-                throw new IOException("Cannot seek to negative position " + pos);
+        public synchronized void close() throws IOException {
+            if (closed) {
+                throw new IOException("Stream closed");
             }
-            if (pos == this.pos) {
-                return;
+
+            if (blockStream != null) {
+                blockStream.close();
+                blockStream = null;
+                partnerStream.close();
             }
+            super.close();
+            closed = true;
+        }
 
-            //
-            // If we have remembered enough bytes to seek backwards to the
-            // desired pos, we can do so easily
-            //
-            if ((pos >= memoryStartPos) && (memoryStartPos + memoryBytes > pos)) {
-                this.pos = pos;
-            } else {
-                //
-                // If we are seeking backwards (and *don't* have enough memory bytes)
-                // we need to reset the NDFS streams.  They will be reopened upon the
-                // next call to nextBlockInputStream().  After this operation, all
-                // seeks will be "forwardSeeks".
-                //
-                if (pos < memoryStartPos && blockStream != null) {
-                    blockStream.close();
-                    blockStream = null;
-                    partnerStream.close();
-                    partnerStream = null;
-                    this.curBlock = 0;
-                    this.bytesRemainingInBlock = 0;
-                    this.pos = 0;
-                    this.memoryStartPos = 0;
-                    this.memoryBytes = 0;
-                    this.memoryBytesStart = 0;
-                    //
-                    // REMIND - this could be made more efficient, to just
-                    // skip back block-by-block
-                    //
+        /**
+         * Basic read()
+         */
+        public synchronized int read() throws IOException {
+            if (closed) {
+                throw new IOException("Stream closed");
+            }
+            int result = -1;
+            if (pos < filelen) {
+                if (pos > blockEnd) {
+                    blockSeekTo(pos);
                 }
-
-                //
-                // Now read ahead to the desired position.
-                //
-                long diff = pos - this.pos;
-                while (diff > 0) {
-                    long skipped = skip(diff);
-                    if (skipped > 0) {
-                        diff -= skipped;
-                    }
+                result = blockStream.read();
+                if (result >= 0) {
+                    pos++;
                 }
-                // Pos will be incremented by skip()
             }
+            return result;
         }
 
         /**
-         * Skip ahead some number of bytes
+         * Read the entire buffer.
          */
-        public synchronized long skip(long skip) throws IOException {
-            long toSkip = 0;
-            long toFastSkip = 0;
-            if (skip > memoryBuf.length) {
-                toSkip = memoryBuf.length;
-                toFastSkip = skip - toSkip;
-            } else {
-                toSkip = skip;
-            }
-            long totalSkipped = 0;
-
-            //
-            // If there's a lot of fast-skipping to do within the current block,
-            // close it and reopen, so we can fast-skip to the target
-            //
-            /**
-            while (toFastSkip > 0) {
-                long amtSkipped = super.skip(toFastSkip);
-                toFastSkip -= amtSkipped;
-                totalSkipped += amtSkipped;
-            }
-            **/
-            long realBytesRemaining = bytesRemainingInBlock + (memoryBytes - (pos - memoryStartPos));
-            if (toFastSkip > 0 && realBytesRemaining > 0 && 
-                toFastSkip < realBytesRemaining) {
-
-                blockStream.close();
-                blockStream = null;
-                partnerStream.close();
-                partnerStream = null;
-
-                long backwardsDistance = curBlockSize - realBytesRemaining;
-                pos -= backwardsDistance;
-                totalSkipped -= backwardsDistance;
-                toFastSkip += backwardsDistance;
-                bytesRemainingInBlock = 0;
-                curBlock--;
-
-                memoryStartPos = pos;
-                memoryBytes = 0;
-                memoryBytesStart = 0;
+        public synchronized int read(byte buf[], int off, int len) throws IOException {
+            if (closed) {
+                throw new IOException("Stream closed");
             }
-
-            //
-            // If there's any fast-skipping to do, we do it by opening a
-            // new block and telling the datanode how many bytes to skip.
-            //
-            while (toFastSkip > 0 && curBlock < blocks.length) {
-
-                if (bytesRemainingInBlock > 0) {
-                    blockStream.close();
-                    blockStream = null;
-                    partnerStream.close();
-                    partnerStream = null;
-
-                    pos += bytesRemainingInBlock;
-                    totalSkipped += bytesRemainingInBlock;
-                    toFastSkip -= bytesRemainingInBlock;
-                    bytesRemainingInBlock = 0;
+            if (pos < filelen) {
+                if (pos > blockEnd) {
+                    blockSeekTo(pos);
                 }
-
-                long oldPos = pos;
-                nextBlockInputStream(toFastSkip);
-                long forwardDistance = (pos - oldPos);
-                totalSkipped += forwardDistance;
-                toFastSkip -= (pos - oldPos);
-
-                memoryStartPos = pos;
-                memoryBytes = 0;
-                memoryBytesStart = 0;
-            }
-
-            //
-            // If there's any remaining toFastSkip, well, there's
-            // not much we can do about it.  We're at the end of
-            // the stream!
-            //
-            if (toFastSkip > 0) {
-                System.err.println("Trying to skip past end of file....");
-                toFastSkip = 0;
+                int result = blockStream.read(buf, off, len);
+                if (result >= 0) {
+                    pos += result;
+                }
+                return result;
             }
-
-            //
-            // Do a slow skip as we approach, so we can fill the client
-            // history buffer
-            //
-            totalSkipped += super.skip(toSkip);
-            toSkip = 0;
-            return totalSkipped;
+            return -1;
         }
 
         /**
+         * Seek to a new arbitrary location
          */
-        public synchronized long getPos() throws IOException {
-            return pos;
+        public synchronized void seek(long targetPos) throws IOException {
+            if (targetPos >= filelen) {
+                throw new IOException("Cannot seek after EOF");
+            }
+            if (targetPos >= pos && targetPos <= blockEnd) {
+                skip(targetPos - pos);
+            } else {
+                pos = targetPos;
+                blockEnd = -1;
+            }
         }
 
         /**
+         * Skip ahead some number of bytes
          */
-        public synchronized int available() throws IOException {
-            if (closed) {
-                throw new IOException("Stream closed");
+        public synchronized long skip(long skip) throws IOException {
+            if (skip > 0) {
+                long targetPos = pos + skip;
+                targetPos = Math.min(targetPos, filelen);
+
+                if (targetPos <= blockEnd) {
+                    return blockStream.skip(skip);
+                } else {
+                    pos = targetPos;
+                    blockEnd = -1;
+                    return skip;
+                }
+            } else {
+                return 0;
             }
-            return (int) Math.min((long) Integer.MAX_VALUE, bytesRemainingInBlock);
         }
 
         /**
          */
-        public synchronized void close() throws IOException {
-            if (closed) {
-                throw new IOException("Stream closed");
-            }
-
-            if (blockStream != null) {
-                blockStream.close();
-                blockStream = null;
-                partnerStream.close();
-            }
-            super.close();
-            closed = true;
+        public synchronized long getPos() throws IOException {
+            return pos;
         }
 
         /**
-         * Other read() functions are implemented in terms of
-         * this one.
          */
-        public synchronized int read() throws IOException {
+        public synchronized int available() throws IOException {
             if (closed) {
                 throw new IOException("Stream closed");
             }
-
-            int b = 0;
-            if (pos - memoryStartPos < memoryBytes) {
-                //
-                // Move the memoryStartPos up to current pos, if necessary.
-                //
-                int diff = (int) (pos - memoryStartPos);
-
-                //
-                // Fetch the byte
-                //
-                b = memoryBuf[(memoryBytesStart + diff) % memoryBuf.length];
-
-                //
-                // Bump the pos
-                //
-                pos++;
-            } else {
-                if (bytesRemainingInBlock == 0) {
-                    if (curBlock < blocks.length) {
-                        nextBlockInputStream();
-                    } else {
-                        return -1;
-                    }
-                }
-                b = blockStream.read();
-                if (b >= 0) {
-                    //
-                    // Remember byte so we can seek backwards at some later time
-                    //
-                    if (memoryBytes == memoryBuf.length) {
-                        memoryStartPos++;
-                    }
-
-                    if (memoryBuf.length > 0) {
-                        int target;
-                        if (memoryBytes == memoryBuf.length) {
-                            target = memoryBytesStart;
-                            memoryBytesStart = (memoryBytesStart + 1) % memoryBuf.length;
-                        } else {
-                            target = (memoryBytesStart + memoryBytes) % memoryBuf.length;
-                            memoryBytes++;
-                        }
-                        memoryBuf[target] = b;
-                    }
-                    bytesRemainingInBlock--;
-                    pos++;
-                }
-            }
-            return b;
+            return (int) (filelen - pos);
         }
 
         /**
@@ -661,7 +538,6 @@
                             }
                         } catch (InterruptedException ie) {
                         }
-
                     } else {
                         blockComplete = true;
                     }
@@ -676,7 +552,6 @@
                 InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName().toString());
                 Socket s = null;
                 try {
-                    //System.err.println("Trying to connect to " + target);
                     s = new Socket(target.getAddress(), target.getPort());
                 } catch (IOException ie) {
                     // Connection failed.  Let's wait a little bit and retry
@@ -687,7 +562,11 @@
                         Thread.sleep(6000);
                     } catch (InterruptedException iex) {
                     }
-                    namenode.abandonBlock(block, src.toString());
+                    if (firstTime) {
+                        namenode.abandonFileInProgress(src.toString());
+                    } else {
+                        namenode.abandonBlock(block, src.toString());
+                    }
                     retry = true;
                     continue;
                 }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java?rev=226687&r1=226686&r2=226687&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java Sun Jul 31 12:54:15 2005
@@ -123,6 +123,9 @@
             throw new IOException("Cannot abandon block during write to " + src);
         }
     }
+    public void abandonFileInProgress(String src) throws IOException {
+        namesystem.abandonFileInProgress(new UTF8(src));
+    }
     public boolean complete(String src, String clientName) throws IOException {
         int returnCode = namesystem.completeFile(new UTF8(src), new UTF8(clientName));
         if (returnCode == STILL_WAITING) {