You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/03 02:18:00 UTC

svn commit: r452282 - in /lucene/hadoop/trunk: ./ src/c++/libhdfs/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/test/org/apache/hadoop/dfs/

Author: cutting
Date: Mon Oct  2 17:17:59 2006
New Revision: 452282

URL: http://svn.apache.org/viewvc?view=rev&rev=452282
Log:
HADOOP-519.  Add positioned read methods to FSInputStream.  Contributed by Milind.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/PositionedReadable.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/c++/libhdfs/hdfs.c
    lucene/hadoop/trunk/src/c++/libhdfs/hdfs.h
    lucene/hadoop/trunk/src/c++/libhdfs/hdfs_test.c
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Oct  2 17:17:59 2006
@@ -106,6 +106,12 @@
 26. HADOOP-566.  Fix scripts to work correctly when accessed through
     relative symbolic links.  (Lee Faris via cutting)
 
+27. HADOOP-519.  Add positioned read methods to FSInputStream.  These
+    permit one to read from a stream without moving its position, and
+    can hence be performed by multiple threads at once on a single
+    stream. Implement an optimized version for DFS and local FS.
+    (Milind Bhandarkar via cutting)
+
 
 Release 0.6.2 - 2006-09-18
 

Modified: lucene/hadoop/trunk/src/c++/libhdfs/hdfs.c
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/c%2B%2B/libhdfs/hdfs.c?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/c++/libhdfs/hdfs.c (original)
+++ lucene/hadoop/trunk/src/c++/libhdfs/hdfs.c Mon Oct  2 17:17:59 2006
@@ -559,6 +559,58 @@
     return noReadBytes;
 }
   
+tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, void* buffer, tSize length)
+{
+    // JAVA EQUIVALENT:
+    //  byte [] bR = new byte[length];
+    //  fis.read(pos, bR, 0, length);
+
+    //Get the JNIEnv* corresponding to current thread
+    JNIEnv* env = getJNIEnv();
+
+    //Parameters
+    jobject jFS = (jobject)fs;
+    jobject jInputStream = (jobject)(f ? f->file : NULL);
+
+    jthrowable jException;
+    jbyteArray jbRarray;
+    jint noReadBytes = 0;
+
+    //Sanity check
+    if (!f || f->type == UNINITIALIZED) {
+        errno = EBADF;
+        return -1;
+    }
+
+    //Error checking... make sure that this file is 'readable'
+    if (f->type != INPUT) {
+        fprintf(stderr, "Cannot read from a non-InputStream object!\n");
+        errno = EINVAL;
+        return -1;
+    }
+
+    //Read the requisite bytes
+    jbRarray = (*env)->NewByteArray(env, length);
+    if (invokeMethod(env, (RetVal*)&noReadBytes, &jException, INSTANCE, 
+                jInputStream, "org/apache/hadoop/fs/FSDataInputStream", 
+                "read", "(J[BII)I", position, jbRarray, 0, length) != 0) {
+        fprintf(stderr, 
+            "Call to org.apache.hadoop.fs.FSDataInputStream::read failed!\n");
+        errno = EINTERNAL;
+        noReadBytes = -1;
+    } else {
+        if(noReadBytes > 0) {
+            (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer);
+        }
+        //This is a valid case: there aren't any bytes left to read!
+        errno = 0;
+    }
+    (*env)->ReleaseByteArrayElements(env, jbRarray, 
+                (*env)->GetByteArrayElements(env, jbRarray, 0), JNI_ABORT);
+
+    return noReadBytes;
+}
+
 tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)
 {
     // JAVA EQUIVALENT

Modified: lucene/hadoop/trunk/src/c++/libhdfs/hdfs.h
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/c%2B%2B/libhdfs/hdfs.h?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/c++/libhdfs/hdfs.h (original)
+++ lucene/hadoop/trunk/src/c++/libhdfs/hdfs.h Mon Oct  2 17:17:59 2006
@@ -142,6 +142,17 @@
     tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
 
     /** 
+     * hdfsPread - Positional read of data from an open file.
+     * @param fs The configured filesystem handle.
+     * @param file The file handle.
+     * @param position Position from which to read
+     * @param buffer The buffer to copy read bytes into.
+     * @param length The length of the buffer.
+     * @return Returns the number of bytes actually read, possibly less than than length;-1 on error.
+     */
+    tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length);
+
+    /** 
      * hdfsWrite - Write data into an open file.
      * @param fs The configured filesystem handle.
      * @param file The file handle.

Modified: lucene/hadoop/trunk/src/c++/libhdfs/hdfs_test.c
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/c%2B%2B/libhdfs/hdfs_test.c?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/c++/libhdfs/hdfs_test.c (original)
+++ lucene/hadoop/trunk/src/c++/libhdfs/hdfs_test.c Mon Oct  2 17:17:59 2006
@@ -97,6 +97,11 @@
         fprintf(stderr, "Read following %d bytes:\n%s\n", 
                 num_read_bytes, buffer);
 
+        num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer, 
+                sizeof(buffer));
+        fprintf(stderr, "Read following %d bytes:\n%s\n", 
+                num_read_bytes, buffer);
+
         hdfsCloseFile(fs, readFile);
     }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Oct  2 17:17:59 2006
@@ -464,6 +464,16 @@
         }
     }
 
+    /** Utility class to encapsulate data node info and its ip address. */
+    private static class DNAddrPair {
+      DatanodeInfo info;
+      InetSocketAddress addr;
+      DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
+        this.info = info;
+        this.addr = addr;
+      }
+    }
+        
     /****************************************************************
      * DFSInputStream provides bytes from a named file.  It handles 
      * negotiation of the namenode and various datanodes as necessary.
@@ -494,7 +504,7 @@
         /**
          * Grab the open-file info from namenode
          */
-        void openInfo() throws IOException {
+        synchronized void openInfo() throws IOException {
             Block oldBlocks[] = this.blocks;
 
             LocatedBlock results[] = namenode.open(src);            
@@ -560,33 +570,12 @@
             // Connect to best DataNode for desired Block, with potential offset
             //
             int failures = 0;
-            InetSocketAddress targetAddr = null;
             TreeSet deadNodes = new TreeSet();
             while (s == null) {
-                DatanodeInfo chosenNode;
-
-                try {
-                    chosenNode = bestNode(nodes[targetBlock], deadNodes);
-                    targetAddr = DataNode.createSocketAddr(chosenNode.getName());
-                } catch (IOException ie) {
-                    String blockInfo =
-                      blocks[targetBlock]+" file="+src+" offset="+target;
-                    if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
-                        throw new IOException("Could not obtain block: " + blockInfo);
-                    }
-                    if (nodes[targetBlock] == null || nodes[targetBlock].length == 0) {
-                        LOG.info("No node available for block: " + blockInfo);
-                    }
-                    LOG.info("Could not obtain block from any node:  " + ie);
-                    try {
-                        Thread.sleep(3000);
-                    } catch (InterruptedException iex) {
-                    }
-                    deadNodes.clear();
-                    openInfo();
-                    failures++;
-                    continue;
-                }
+                DNAddrPair retval = chooseDataNode(targetBlock, deadNodes);
+                DatanodeInfo chosenNode = retval.info;
+                InetSocketAddress targetAddr = retval.addr;
+            
                 try {
                     s = new Socket();
                     s.connect(targetAddr, READ_TIMEOUT);
@@ -704,11 +693,142 @@
             return -1;
         }
 
+        
+        private DNAddrPair chooseDataNode(int blockId, TreeSet deadNodes)
+        throws IOException {
+          int failures = 0;
+          while (true) {
+            try {
+              DatanodeInfo chosenNode = bestNode(nodes[blockId], deadNodes);
+              InetSocketAddress targetAddr = DataNode.createSocketAddr(chosenNode.getName());
+              return new DNAddrPair(chosenNode, targetAddr);
+            } catch (IOException ie) {
+              String blockInfo =
+                  blocks[blockId]+" file="+src;
+              if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
+                throw new IOException("Could not obtain block: " + blockInfo);
+              }
+              if (nodes[blockId] == null || nodes[blockId].length == 0) {
+                LOG.info("No node available for block: " + blockInfo);
+              }
+              LOG.info("Could not obtain block from any node:  " + ie);
+              try {
+                Thread.sleep(3000);
+              } catch (InterruptedException iex) {
+              }
+              deadNodes.clear();
+              openInfo();
+              failures++;
+              continue;
+            }
+          }
+        } 
+        
+        private void fetchBlockByteRange(int blockId, long start,
+            long end, byte[] buf, int offset) throws IOException {
+          //
+          // Connect to best DataNode for desired Block, with potential offset
+          //
+          TreeSet deadNodes = new TreeSet();
+          Socket dn = null;
+          while (dn == null) {
+            DNAddrPair retval = chooseDataNode(blockId, deadNodes);
+            DatanodeInfo chosenNode = retval.info;
+            InetSocketAddress targetAddr = retval.addr;
+            
+            try {
+              dn = new Socket();
+              dn.connect(targetAddr, READ_TIMEOUT);
+              dn.setSoTimeout(READ_TIMEOUT);
+              
+              //
+              // Xmit header info to datanode
+              //
+              DataOutputStream out = new DataOutputStream(new BufferedOutputStream(dn.getOutputStream()));
+              out.write(OP_READ_RANGE_BLOCK);
+              blocks[blockId].write(out);
+              out.writeLong(start);
+              out.writeLong(end);
+              out.flush();
+              
+              //
+              // Get bytes in block, set streams
+              //
+              DataInputStream in = new DataInputStream(new BufferedInputStream(dn.getInputStream()));
+              long curBlockSize = in.readLong();
+              long actualStart = in.readLong();
+              long actualEnd = in.readLong();
+              if (curBlockSize != blocks[blockId].len) {
+                throw new IOException("Recorded block size is " +
+                    blocks[blockId].len + ", but datanode reports size of " +
+                    curBlockSize);
+              }
+              if ((actualStart != start) || (actualEnd != end)) {
+                throw new IOException("Asked for byte range  " + start +
+                    "-" + end + ", but only received range " + actualStart +
+                    "-" + actualEnd);
+              }
+              int nread = in.read(buf, offset, (int)(end - start + 1));
+            } catch (IOException ex) {
+              // Put chosen node into dead list, continue
+              LOG.info("Failed to connect to " + targetAddr + ":" + ex);
+              deadNodes.add(chosenNode);
+              if (dn != null) {
+                try {
+                  dn.close();
+                } catch (IOException iex) {
+                }
+              }
+              dn = null;
+            }
+          }
+        }
+        
+        public int read(long position, byte[] buf, int off, int len)
+        throws IOException {
+          // sanity checks
+          checkOpen();
+          if (closed) {
+            throw new IOException("Stream closed");
+          }
+          if ((position < 0) || (position > filelen)) {
+            return -1;
+          }
+          int realLen = len;
+          if ((position + len) > filelen) {
+            realLen = (int)(filelen - position);
+          }
+          // determine the block and byte range within the block
+          // corresponding to position and realLen
+          int targetBlock = -1;
+          long targetStart = 0;
+          long targetEnd = 0;
+          for (int idx = 0; idx < blocks.length; idx++) {
+            long blocklen = blocks[idx].getNumBytes();
+            targetEnd = targetStart + blocklen - 1;
+            if (position >= targetStart && position <= targetEnd) {
+              targetBlock = idx;
+              targetStart = position - targetStart;
+              targetEnd = Math.min(blocklen, targetStart + realLen) - 1;
+              realLen = (int)(targetEnd - targetStart + 1);
+              break;
+            }
+            targetStart += blocklen;
+          }
+          if (targetBlock < 0) {
+            throw new IOException(
+                "Impossible situation: could not find target position "+
+                position);
+          }
+          fetchBlockByteRange(targetBlock, targetStart, targetEnd, buf, off);
+          return realLen;
+        }
+        
         /**
          * Seek to a new arbitrary location
          */
         public synchronized void seek(long targetPos) throws IOException {
-            if (targetPos >= filelen) {
+            if (targetPos > filelen) {
                 throw new IOException("Cannot seek after EOF");
             }
             pos = targetPos;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon Oct  2 17:17:59 2006
@@ -510,7 +510,8 @@
                     byte op = (byte) in.read();
                     if (op == OP_WRITE_BLOCK) {
                         writeBlock(in);
-                    } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK) {
+                    } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK ||
+                        op == OP_READ_RANGE_BLOCK) {
                         readBlock(in, op);
                     } else {
                         while (op >= 0) {
@@ -548,8 +549,12 @@
           b.readFields(in);
 
           long toSkip = 0;
+          long endOffset = -1;
           if (op == OP_READSKIP_BLOCK) {
               toSkip = in.readLong();
+          } else if (op == OP_READ_RANGE_BLOCK) {
+            toSkip = in.readLong();
+            endOffset = in.readLong();
           }
 
           //
@@ -567,14 +572,15 @@
                   // Get blockdata from disk
                   //
                   long len = data.getLength(b);
+                  if (endOffset < 0) { endOffset = len; }
                   DataInputStream in2 = new DataInputStream(data.getBlockData(b));
                   out.writeLong(len);
 
-                  if (op == OP_READSKIP_BLOCK) {
+                  long amtSkipped = 0;
+                  if ((op == OP_READSKIP_BLOCK) || (op == OP_READ_RANGE_BLOCK)) {
                       if (toSkip > len) {
                           toSkip = len;
                       }
-                      long amtSkipped = 0;
                       try {
                           amtSkipped = in2.skip(toSkip);
                       } catch (IOException iex) {
@@ -583,26 +589,35 @@
                       }
                       out.writeLong(amtSkipped);
                   }
+                  if (op == OP_READ_RANGE_BLOCK) {
+                      if (endOffset > len) {
+                        endOffset = len;
+                      }
+                      out.writeLong(endOffset);
+                  }
 
                   byte buf[] = new byte[BUFFER_SIZE];
                   try {
+                    int toRead = (int) (endOffset - amtSkipped + 1);
                       int bytesRead = 0;
                       try {
-                          bytesRead = in2.read(buf);
+                          bytesRead = in2.read(buf, 0, Math.min(BUFFER_SIZE, toRead));
                           myMetrics.readBytes(bytesRead);
                       } catch (IOException iex) {
                           shutdown();
                           throw iex;
                       }
-                      while (bytesRead >= 0) {
+                      while (toRead > 0 && bytesRead >= 0) {
                           out.write(buf, 0, bytesRead);
-                          len -= bytesRead;
+                          toRead -= bytesRead;
+                          if (toRead > 0) {
                           try {
-                              bytesRead = in2.read(buf);
+                              bytesRead = in2.read(buf, 0, Math.min(BUFFER_SIZE, toRead));
                               myMetrics.readBytes(bytesRead);
                           } catch (IOException iex) {
                               shutdown();
                               throw iex;
+                          }
                           }
                       }
                   } catch (SocketException se) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Mon Oct  2 17:17:59 2006
@@ -86,6 +86,7 @@
     public static final byte OP_WRITE_BLOCK = (byte) 80;
     public static final byte OP_READ_BLOCK = (byte) 81;
     public static final byte OP_READSKIP_BLOCK = (byte) 82;
+    public static final byte OP_READ_RANGE_BLOCK = (byte) 83;
 
     // Encoding types
     public static final byte RUNLENGTH_ENCODING = 0;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java Mon Oct  2 17:17:59 2006
@@ -26,7 +26,8 @@
 
 /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
  * and buffers input through a {@link BufferedInputStream}. */
-public class FSDataInputStream extends DataInputStream {
+public class FSDataInputStream extends DataInputStream
+    implements Seekable, PositionedReadable {
   private static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.fs.DataInputStream");
 
@@ -36,7 +37,8 @@
   private int bytesPerSum = 1;
   
   /** Verify that data matches checksums. */
-  private class Checker extends FilterInputStream implements Seekable {
+  private class Checker extends FilterInputStream
+      implements Seekable, PositionedReadable {
     private FileSystem fs;
     private Path file;
     private FSDataInputStream sums;
@@ -139,6 +141,21 @@
       return ((FSInputStream)in).getPos();
     }
 
+    public int read(long position, byte[] buffer, int offset, int length)
+    throws IOException {
+      return ((FSInputStream)in).read(position, buffer, offset, length);
+    }
+    
+    public void readFully(long position, byte[] buffer, int offset, int length)
+    throws IOException {
+      ((FSInputStream)in).readFully(position, buffer, offset, length);
+    }
+    
+    public void readFully(long position, byte[] buffer)
+    throws IOException {
+      ((FSInputStream)in).readFully(position, buffer);
+    }
+
     public void close() throws IOException {
       super.close();
       stopSumming();
@@ -181,6 +198,16 @@
       return position;                            // return cached position
     }
     
+    public int read(long position, byte[] buffer, int offset, int length)
+    throws IOException {
+      return ((PositionedReadable)in).read(position, buffer, offset, length);
+    }
+    
+    public void readFully(long position, byte[] buffer, int offset, int length)
+    throws IOException {
+      ((PositionedReadable)in).readFully(position, buffer, offset, length);
+    }
+    
   }
 
   /** Buffer input.  This improves performance significantly.*/
@@ -224,6 +251,15 @@
       return buf[pos++] & 0xff;
     }
 
+    public int read(long position, byte[] buffer, int offset, int length)
+    throws IOException {
+      return ((PositionCache)in).read(position, buffer, offset, length);
+    }
+    
+    public void readFully(long position, byte[] buffer, int offset, int length)
+    throws IOException {
+      ((PositionCache)in).readFully(position, buffer, offset, length);
+    }
 }
   
   
@@ -252,7 +288,7 @@
     this.in = new Buffer(new PositionCache(in), bufferSize);
   }
   
-  public void seek(long desired) throws IOException {
+  public synchronized void seek(long desired) throws IOException {
     ((Buffer)in).seek(desired);
   }
 
@@ -260,4 +296,18 @@
     return ((Buffer)in).getPos();
   }
 
+  public int read(long position, byte[] buffer, int offset, int length)
+  throws IOException {
+    return ((Buffer)in).read(position, buffer, offset, length);
+  }
+  
+  public void readFully(long position, byte[] buffer, int offset, int length)
+  throws IOException {
+    ((Buffer)in).readFully(position, buffer, offset, length);
+  }
+  
+  public void readFully(long position, byte[] buffer)
+  throws IOException {
+    ((Buffer)in).readFully(position, buffer, 0, buffer.length);
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java Mon Oct  2 17:17:59 2006
@@ -23,7 +23,8 @@
  *
  * @author Mike Cafarella
  *****************************************************************/
-public abstract class FSInputStream extends InputStream implements Seekable {
+public abstract class FSInputStream extends InputStream
+    implements Seekable, PositionedReadable {
     /**
      * Seek to the given offset from the start of the file.
      * The next read() will be from that location.  Can't
@@ -35,4 +36,36 @@
      * Return the current offset from the start of the file
      */
     public abstract long getPos() throws IOException;
+    
+    public int read(long position, byte[] buffer, int offset, int length)
+    throws IOException {
+      synchronized (this) {
+        long oldPos = getPos();
+        int nread = -1;
+        try {
+          seek(position);
+          nread = read(buffer, offset, length);
+        } finally {
+          seek(oldPos);
+        }
+        return nread;
+      }
+    }
+    
+    public void readFully(long position, byte[] buffer, int offset, int length)
+    throws IOException {
+      int nread = 0;
+      while (nread < length) {
+        int nbytes = read(position+nread, buffer, offset+nread, length-nread);
+        if (nbytes < 0) {
+          throw new EOFException("End of file reached before reading fully.");
+        }
+        nread += nbytes;
+      }
+    }
+    
+    public void readFully(long position, byte[] buffer)
+    throws IOException {
+      readFully(position, buffer, 0, buffer.length);
+    }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Mon Oct  2 17:17:59 2006
@@ -17,6 +17,7 @@
 package org.apache.hadoop.fs;
 
 import java.io.*;
+import java.nio.ByteBuffer;
 import java.util.*;
 import java.nio.channels.*;
 import org.apache.hadoop.conf.Configuration;
@@ -113,6 +114,16 @@
           }
         }
 
+        public int read(long position, byte[] b, int off, int len)
+        throws IOException {
+          ByteBuffer bb = ByteBuffer.wrap(b, off, len);
+          try {
+            return fis.getChannel().read(bb, position);
+          } catch (IOException e) {
+            throw new FSError(e);
+          }
+        }
+        
         public long skip(long n) throws IOException { return fis.skip(n); }
     }
     

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/PositionedReadable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/PositionedReadable.java?view=auto&rev=452282
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/PositionedReadable.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/PositionedReadable.java Mon Oct  2 17:17:59 2006
@@ -0,0 +1,45 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.*;
+import org.apache.hadoop.fs.*;
+
+/** Stream that permits positional reading. */
+public interface PositionedReadable {
+  /**
+   * Read upto the specified number of bytes, from a given
+   * position within a file, and return the number of bytes read. This does not
+   * change the current offset of a file, and is thread-safe.
+   */
+  public int read(long position, byte[] buffer, int offset, int length)
+  throws IOException;
+  
+  /**
+   * Read the specified number of bytes, from a given
+   * position within a file. This does not
+   * change the current offset of a file, and is thread-safe.
+   */
+  public void readFully(long position, byte[] buffer, int offset, int length)
+  throws IOException;
+  
+  /**
+   * Read number of bytes equalt to the length of the buffer, from a given
+   * position within a file. This does not
+   * change the current offset of a file, and is thread-safe.
+   */
+  public void readFully(long position, byte[] buffer) throws IOException;
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java?view=auto&rev=452282
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java Mon Oct  2 17:17:59 2006
@@ -0,0 +1,125 @@
+package org.apache.hadoop.dfs;
+
+import javax.swing.filechooser.FileSystemView;
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests the DFS positional read functionality in a single node
+ * mini-cluster.
+ * @author Milind Bhandarkar
+ */
+public class TestPread extends TestCase {
+  static final long seed = 0xDEADBEEFL;
+  static final int blockSize = 4096;
+
+  private void writeFile(FileSystem fileSys, Path name) throws IOException {
+    // create and write a file that contains three blocks of data
+    DataOutputStream stm = fileSys.create(name, true, 4096, (short)1,
+        (long)blockSize);
+    byte[] buffer = new byte[(int)(3*blockSize)];
+    Random rand = new Random(seed);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+    stm.close();
+  }
+  
+  private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
+    for (int idx = 0; idx < actual.length; idx++) {
+      this.assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+          expected[from+idx]+" actual "+actual[idx],
+          actual[idx], expected[from+idx]);
+      actual[idx] = 0;
+    }
+  }
+  
+  private void doPread(FSDataInputStream stm, long position, byte[] buffer,
+      int offset, int length) throws IOException {
+    int nread = 0;
+    while (nread < length) {
+      int nbytes = stm.read(position+nread, buffer, offset+nread, length-nread);
+      assertTrue("Error in pread", nbytes > 0);
+      nread += nbytes;
+    }
+  }
+  private void pReadFile(FileSystem fileSys, Path name) throws IOException {
+    FSDataInputStream stm = fileSys.open(name);
+    byte[] expected = new byte[(int)(3*blockSize)];
+    Random rand = new Random(seed);
+    rand.nextBytes(expected);
+    // do a sanity check. Read first 4K bytes
+    byte[] actual = new byte[4096];
+    stm.readFully(actual);
+    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
+    // now do a pread for the first 8K bytes
+    actual = new byte[8192];
+    doPread(stm, 0L, actual, 0, 8192);
+    checkAndEraseData(actual, 0, expected, "Pread Test 1");
+    // Now check to see if the normal read returns 4K-8K byte range
+    actual = new byte[4096];
+    stm.readFully(actual);
+    checkAndEraseData(actual, 4096, expected, "Pread Test 2");
+    // Now see if we can cross a single block boundary successfully
+    // read 4K bytes from blockSize - 2K offset
+    stm.readFully(blockSize - 2048, actual, 0, 4096);
+    checkAndEraseData(actual, (int)(blockSize-2048), expected, "Pread Test 3");
+    // now see if we can cross two block boundaries successfully
+    // read blockSize + 4K bytes from blockSize - 2K offset
+    actual = new byte[(int)(blockSize+4096)];
+    stm.readFully(blockSize - 2048, actual);
+    checkAndEraseData(actual, (int)(blockSize-2048), expected, "Pread Test 4");
+    // now check that even after all these preads, we can still read
+    // bytes 8K-12K
+    actual = new byte[4096];
+    stm.readFully(actual);
+    checkAndEraseData(actual, 8192, expected, "Pread Test 5");
+    // all done
+    stm.close();
+  }
+  
+  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
+    assertTrue(fileSys.exists(name));
+    fileSys.delete(name);
+    assertTrue(!fileSys.exists(name));
+  }
+  
+  /**
+   * Tests positional read in DFS.
+   */
+  public void testPreadDFS() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, false);
+    FileSystem fileSys = cluster.getFileSystem();
+    try {
+      Path file1 = new Path("preadtest.dat");
+      writeFile(fileSys, file1);
+      pReadFile(fileSys, file1);
+      cleanupFile(fileSys, file1);
+    } finally {
+      fileSys.close();
+      cluster.shutdown();
+    }
+  }
+  
+  /**
+   * Tests positional read in LocalFS.
+   */
+  public void testPreadLocalFS() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fileSys = FileSystem.getNamed("local", conf);
+    try {
+      Path file1 = new Path("build/test/data", "preadtest.dat");
+      writeFile(fileSys, file1);
+      pReadFile(fileSys, file1);
+      cleanupFile(fileSys, file1);
+    } finally {
+      fileSys.close();
+    }
+  }
+}