You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/08/18 05:02:19 UTC

svn commit: r1159004 - in /hadoop/common/trunk/hdfs: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hado...

Author: todd
Date: Thu Aug 18 03:02:19 2011
New Revision: 1159004

URL: http://svn.apache.org/viewvc?rev=1159004&view=rev
Log:
HDFS-2260. Refactor BlockReader into an interface and implementation. Contributed by Todd Lipcon.

Added:
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
Modified:
    hadoop/common/trunk/hdfs/CHANGES.txt
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1159004&r1=1159003&r2=1159004&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Thu Aug 18 03:02:19 2011
@@ -668,6 +668,9 @@ Trunk (unreleased changes)
     HDFS-2265. Remove unnecessary BlockTokenSecretManager fields/methods from
     BlockManager.  (szetszwo)
 
+    HDFS-2260. Refactor BlockReader into an interface and implementation.
+    (todd)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1159004&r1=1159003&r2=1159004&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java Thu Aug 18 03:02:19 2011
@@ -17,95 +17,18 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FSInputChecker;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-
-
-/** This is a wrapper around connection to datanode
- * and understands checksum, offset etc.
- *
- * Terminology:
- * <dl>
- * <dt>block</dt>
- *   <dd>The hdfs block, typically large (~64MB).
- *   </dd>
- * <dt>chunk</dt>
- *   <dd>A block is divided into chunks, each comes with a checksum.
- *       We want transfers to be chunk-aligned, to be able to
- *       verify checksums.
- *   </dd>
- * <dt>packet</dt>
- *   <dd>A grouping of chunks used for transport. It contains a
- *       header, followed by checksum data, followed by real data.
- *   </dd>
- * </dl>
- * Please see DataNode for the RPC specification.
- */
-@InterfaceAudience.Private
-public class BlockReader extends FSInputChecker {
-
-  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
-  private DataInputStream in;
-  private DataChecksum checksum;
-
-  /** offset in block of the last chunk received */
-  private long lastChunkOffset = -1;
-  private long lastChunkLen = -1;
-  private long lastSeqNo = -1;
-
-  /** offset in block where reader wants to actually read */
-  private long startOffset;
-
-  /** offset in block of of first chunk - may be less than startOffset
-      if startOffset is not chunk-aligned */
-  private final long firstChunkOffset;
 
-  private int bytesPerChecksum;
-  private int checksumSize;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
 
-  /**
-   * The total number of bytes we need to transfer from the DN.
-   * This is the amount that the user has requested plus some padding
-   * at the beginning so that the read can begin on a chunk boundary.
-   */
-  private final long bytesNeededToFinish;
+/**
+ * A BlockReader is responsible for reading a single block
+ * from a single datanode.
+ */
+public interface BlockReader extends Seekable, PositionedReadable {
 
-  private boolean eos = false;
-  private boolean sentStatusCode = false;
-  
-  byte[] skipBuf = null;
-  ByteBuffer checksumBytes = null;
-  /** Amount of unread data in the current received packet */
-  int dataLeft = 0;
-  
-  /* FSInputChecker interface */
-  
   /* same interface as inputStream java.io.InputStream#read()
    * used by DFSInputStream#read()
    * This violates one rule when there is a checksum error:
@@ -113,415 +36,35 @@ public class BlockReader extends FSInput
    * because it first reads the data to user buffer and then checks
    * the checksum.
    */
-  @Override
-  public synchronized int read(byte[] buf, int off, int len) 
-                               throws IOException {
-    
-    // This has to be set here, *before* the skip, since we can
-    // hit EOS during the skip, in the case that our entire read
-    // is smaller than the checksum chunk.
-    boolean eosBefore = eos;
-
-    //for the first read, skip the extra bytes at the front.
-    if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
-      // Skip these bytes. But don't call this.skip()!
-      int toSkip = (int)(startOffset - firstChunkOffset);
-      if ( skipBuf == null ) {
-        skipBuf = new byte[bytesPerChecksum];
-      }
-      if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
-        // should never happen
-        throw new IOException("Could not skip required number of bytes");
-      }
-    }
-    
-    int nRead = super.read(buf, off, len);
-
-    // if eos was set in the previous read, send a status code to the DN
-    if (eos && !eosBefore && nRead >= 0) {
-      if (needChecksum()) {
-        sendReadResult(dnSock, Status.CHECKSUM_OK);
-      } else {
-        sendReadResult(dnSock, Status.SUCCESS);
-      }
-    }
-    return nRead;
-  }
-
-  @Override
-  public synchronized long skip(long n) throws IOException {
-    /* How can we make sure we don't throw a ChecksumException, at least
-     * in majority of the cases?. This one throws. */  
-    if ( skipBuf == null ) {
-      skipBuf = new byte[bytesPerChecksum]; 
-    }
-
-    long nSkipped = 0;
-    while ( nSkipped < n ) {
-      int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
-      int ret = read(skipBuf, 0, toSkip);
-      if ( ret <= 0 ) {
-        return nSkipped;
-      }
-      nSkipped += ret;
-    }
-    return nSkipped;
-  }
-
-  @Override
-  public int read() throws IOException {
-    throw new IOException("read() is not expected to be invoked. " +
-                          "Use read(buf, off, len) instead.");
-  }
-  
-  @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    /* Checksum errors are handled outside the BlockReader. 
-     * DFSInputStream does not always call 'seekToNewSource'. In the 
-     * case of pread(), it just tries a different replica without seeking.
-     */ 
-    return false;
-  }
-  
-  @Override
-  public void seek(long pos) throws IOException {
-    throw new IOException("Seek() is not supported in BlockInputChecker");
-  }
-
-  @Override
-  protected long getChunkPosition(long pos) {
-    throw new RuntimeException("getChunkPosition() is not supported, " +
-                               "since seek is not required");
-  }
-  
+  int read(byte[] buf, int off, int len) throws IOException;
+
   /**
-   * Makes sure that checksumBytes has enough capacity 
-   * and limit is set to the number of checksum bytes needed 
-   * to be read.
+   * Skip the given number of bytes
    */
-  private void adjustChecksumBytes(int dataLen) {
-    int requiredSize = 
-      ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
-    if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
-      checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
-    } else {
-      checksumBytes.clear();
-    }
-    checksumBytes.limit(requiredSize);
-  }
-  
-  @Override
-  protected synchronized int readChunk(long pos, byte[] buf, int offset, 
-                                       int len, byte[] checksumBuf) 
-                                       throws IOException {
-    // Read one chunk.
-    if (eos) {
-      // Already hit EOF
-      return -1;
-    }
-    
-    // Read one DATA_CHUNK.
-    long chunkOffset = lastChunkOffset;
-    if ( lastChunkLen > 0 ) {
-      chunkOffset += lastChunkLen;
-    }
-    
-    // pos is relative to the start of the first chunk of the read.
-    // chunkOffset is relative to the start of the block.
-    // This makes sure that the read passed from FSInputChecker is the
-    // for the same chunk we expect to be reading from the DN.
-    if ( (pos + firstChunkOffset) != chunkOffset ) {
-      throw new IOException("Mismatch in pos : " + pos + " + " + 
-                            firstChunkOffset + " != " + chunkOffset);
-    }
-
-    // Read next packet if the previous packet has been read completely.
-    if (dataLeft <= 0) {
-      //Read packet headers.
-      PacketHeader header = new PacketHeader();
-      header.readFields(in);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("DFSClient readChunk got header " + header);
-      }
-
-      // Sanity check the lengths
-      if (!header.sanityCheck(lastSeqNo)) {
-           throw new IOException("BlockReader: error in packet header " +
-                                 header);
-      }
-
-      lastSeqNo = header.getSeqno();
-      dataLeft = header.getDataLen();
-      adjustChecksumBytes(header.getDataLen());
-      if (header.getDataLen() > 0) {
-        IOUtils.readFully(in, checksumBytes.array(), 0,
-                          checksumBytes.limit());
-      }
-    }
-
-    // Sanity checks
-    assert len >= bytesPerChecksum;
-    assert checksum != null;
-    assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
-
-
-    int checksumsToRead, bytesToRead;
-
-    if (checksumSize > 0) {
-
-      // How many chunks left in our packet - this is a ceiling
-      // since we may have a partial chunk at the end of the file
-      int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
-
-      // How many chunks we can fit in databuffer
-      //  - note this is a floor since we always read full chunks
-      int chunksCanFit = Math.min(len / bytesPerChecksum,
-                                  checksumBuf.length / checksumSize);
-
-      // How many chunks should we read
-      checksumsToRead = Math.min(chunksLeft, chunksCanFit);
-      // How many bytes should we actually read
-      bytesToRead = Math.min(
-        checksumsToRead * bytesPerChecksum, // full chunks
-        dataLeft); // in case we have a partial
-    } else {
-      // no checksum
-      bytesToRead = Math.min(dataLeft, len);
-      checksumsToRead = 0;
-    }
-
-    if ( bytesToRead > 0 ) {
-      // Assert we have enough space
-      assert bytesToRead <= len;
-      assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
-      assert checksumBuf.length >= checksumSize * checksumsToRead;
-      IOUtils.readFully(in, buf, offset, bytesToRead);
-      checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
-    }
-
-    dataLeft -= bytesToRead;
-    assert dataLeft >= 0;
-
-    lastChunkOffset = chunkOffset;
-    lastChunkLen = bytesToRead;
-
-    // If there's no data left in the current packet after satisfying
-    // this read, and we have satisfied the client read, we expect
-    // an empty packet header from the DN to signify this.
-    // Note that pos + bytesToRead may in fact be greater since the
-    // DN finishes off the entire last chunk.
-    if (dataLeft == 0 &&
-        pos + bytesToRead >= bytesNeededToFinish) {
-
-      // Read header
-      PacketHeader hdr = new PacketHeader();
-      hdr.readFields(in);
-
-      if (!hdr.isLastPacketInBlock() ||
-          hdr.getDataLen() != 0) {
-        throw new IOException("Expected empty end-of-read packet! Header: " +
-                              hdr);
-      }
-
-      eos = true;
-    }
-
-    if ( bytesToRead == 0 ) {
-      return -1;
-    }
-
-    return bytesToRead;
-  }
-  
-  private BlockReader(String file, String bpid, long blockId,
-      DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
-    // Path is used only for printing block and file information in debug
-    super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
-          1, verifyChecksum,
-          checksum.getChecksumSize() > 0? checksum : null, 
-          checksum.getBytesPerChecksum(),
-          checksum.getChecksumSize());
-    
-    this.dnSock = dnSock;
-    this.in = in;
-    this.checksum = checksum;
-    this.startOffset = Math.max( startOffset, 0 );
-
-    // The total number of bytes that we need to transfer from the DN is
-    // the amount that the user wants (bytesToRead), plus the padding at
-    // the beginning in order to chunk-align. Note that the DN may elect
-    // to send more than this amount if the read starts/ends mid-chunk.
-    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-
-    this.firstChunkOffset = firstChunkOffset;
-    lastChunkOffset = firstChunkOffset;
-    lastChunkLen = -1;
-
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-  }
-
-  public static BlockReader newBlockReader(Socket sock, String file,
-      ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
-      long startOffset, long len, int bufferSize) throws IOException {
-    return newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize,
-        true);
-  }
-
-  /** Java Doc required */
-  public static BlockReader newBlockReader( Socket sock, String file, 
-                                     ExtendedBlock block, 
-                                     Token<BlockTokenIdentifier> blockToken,
-                                     long startOffset, long len,
-                                     int bufferSize, boolean verifyChecksum)
-                                     throws IOException {
-    return newBlockReader(sock, file, block, blockToken, startOffset,
-                          len, bufferSize, verifyChecksum, "");
-  }
+  long skip(long n) throws IOException;
 
   /**
-   * Create a new BlockReader specifically to satisfy a read.
-   * This method also sends the OP_READ_BLOCK request.
-   *
-   * @param sock  An established Socket to the DN. The BlockReader will not close it normally
-   * @param file  File location
-   * @param block  The block object
-   * @param blockToken  The block token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
-   * @param bufferSize  The IO buffer size (not the client buffer size)
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
-   * @return New BlockReader instance, or null on error.
+   * Read a single byte, returning -1 at enf of stream.
    */
-  public static BlockReader newBlockReader( Socket sock, String file,
-                                     ExtendedBlock block, 
-                                     Token<BlockTokenIdentifier> blockToken,
-                                     long startOffset, long len,
-                                     int bufferSize, boolean verifyChecksum,
-                                     String clientName)
-                                     throws IOException {
-    // in and out will be closed when sock is closed (by the caller)
-    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-          NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT)));
-    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
-    
-    //
-    // Get bytes in block, set streams
-    //
-
-    DataInputStream in = new DataInputStream(
-        new BufferedInputStream(NetUtils.getInputStream(sock), 
-                                bufferSize));
-    
-    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
-        vintPrefixed(in));
-    if (status.getStatus() != Status.SUCCESS) {
-      if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
-        throw new InvalidBlockTokenException(
-            "Got access token error for OP_READ_BLOCK, self="
-                + sock.getLocalSocketAddress() + ", remote="
-                + sock.getRemoteSocketAddress() + ", for file " + file
-                + ", for pool " + block.getBlockPoolId() + " block " 
-                + block.getBlockId() + "_" + block.getGenerationStamp());
-      } else {
-        throw new IOException("Got error for OP_READ_BLOCK, self="
-            + sock.getLocalSocketAddress() + ", remote="
-            + sock.getRemoteSocketAddress() + ", for file " + file
-            + ", for pool " + block.getBlockPoolId() + " block " 
-            + block.getBlockId() + "_" + block.getGenerationStamp());
-      }
-    }
-    DataChecksum checksum = DataChecksum.newDataChecksum( in );
-    //Warning when we get CHECKSUM_NULL?
-    
-    // Read the first chunk offset.
-    long firstChunkOffset = in.readLong();
-    
-    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
-        firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
-      throw new IOException("BlockReader: error in first chunk offset (" +
-                            firstChunkOffset + ") startOffset is " + 
-                            startOffset + " for file " + file);
-    }
-
-    return new BlockReader(file, block.getBlockPoolId(), block.getBlockId(),
-        in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    startOffset = -1;
-    checksum = null;
-    if (dnSock != null) {
-      dnSock.close();
-    }
-
-    // in will be closed when its Socket is closed.
-  }
-  
-  /** kind of like readFully(). Only reads as much as possible.
+  int read() throws IOException;
+
+  void close() throws IOException;
+
+  /**
+   * kind of like readFully(). Only reads as much as possible.
    * And allows use of protected readFully().
    */
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return readFully(this, buf, offset, len);
-  }
+  int readAll(byte[] buf, int offset, int len) throws IOException;
 
   /**
    * Take the socket used to talk to the DN.
    */
-  public Socket takeSocket() {
-    assert hasSentStatusCode() :
-      "BlockReader shouldn't give back sockets mid-read";
-    Socket res = dnSock;
-    dnSock = null;
-    return res;
-  }
+  Socket takeSocket();
 
   /**
    * Whether the BlockReader has reached the end of its input stream
    * and successfully sent a status code back to the datanode.
    */
-  public boolean hasSentStatusCode() {
-    return sentStatusCode;
-  }
+  boolean hasSentStatusCode();
 
-  /**
-   * When the reader reaches end of the read, it sends a status response
-   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
-   * closing our connection (which we will re-open), but won't affect
-   * data correctness.
-   */
-  void sendReadResult(Socket sock, Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + sock;
-    try {
-      OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
-      
-      ClientReadStatusProto.newBuilder()
-        .setStatus(statusCode)
-        .build()
-        .writeDelimitedTo(out);
-
-      out.flush();
-      sentStatusCode = true;
-    } catch (IOException e) {
-      // It's ok not to be able to send this. But something is probably wrong.
-      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               sock.getInetAddress() + ": " + e.getMessage());
-    }
-  }
-  
-  /**
-   * File name to print when accessing a block directly (from servlets)
-   * @param s Address of the block location
-   * @param poolId Block pool ID of the block
-   * @param blockId Block ID of the block
-   * @return string that has a file name for debug purposes
-   */
-  public static String getFileName(final InetSocketAddress s,
-      final String poolId, final long blockId) {
-    return s.toString() + ":" + poolId + ":" + blockId;
-  }
 }

Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1159004&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Thu Aug 18 03:02:19 2011
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+
+/** 
+ * Utility class to create BlockReader implementations.
+ */
+@InterfaceAudience.Private
+public class BlockReaderFactory {
+  public static BlockReader newBlockReader(Socket sock, String file,
+      ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
+      long startOffset, long len, int bufferSize) throws IOException {
+    return newBlockReader(sock, file, block, blockToken, startOffset,
+        len, bufferSize, true, "");
+  }
+
+  /**
+   * Create a new BlockReader specifically to satisfy a read.
+   * This method also sends the OP_READ_BLOCK request.
+   *
+   * @param sock  An established Socket to the DN. The BlockReader will not close it normally
+   * @param file  File location
+   * @param block  The block object
+   * @param blockToken  The block token for security
+   * @param startOffset  The read offset, relative to block head
+   * @param len  The number of bytes to read
+   * @param bufferSize  The IO buffer size (not the client buffer size)
+   * @param verifyChecksum  Whether to verify checksum
+   * @param clientName  Client name
+   * @return New BlockReader instance, or null on error.
+   */
+  public static BlockReader newBlockReader(
+                                     Socket sock, String file,
+                                     ExtendedBlock block, 
+                                     Token<BlockTokenIdentifier> blockToken,
+                                     long startOffset, long len,
+                                     int bufferSize, boolean verifyChecksum,
+                                     String clientName)
+                                     throws IOException {
+    return RemoteBlockReader.newBlockReader(
+        sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
+  }
+  
+  /**
+   * File name to print when accessing a block directly (from servlets)
+   * @param s Address of the block location
+   * @param poolId Block pool ID of the block
+   * @param blockId Block ID of the block
+   * @return string that has a file name for debug purposes
+   */
+  public static String getFileName(final InetSocketAddress s,
+      final String poolId, final long blockId) {
+    return s.toString() + ":" + poolId + ":" + blockId;
+  }
+}

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1159004&r1=1159003&r2=1159004&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu Aug 18 03:02:19 2011
@@ -776,7 +776,7 @@ public class DFSInputStream extends FSIn
       try {
         // The OP_READ_BLOCK request is sent as we make the BlockReader
         BlockReader reader =
-            BlockReader.newBlockReader(sock, file, block,
+            BlockReaderFactory.newBlockReader(sock, file, block,
                                        blockToken,
                                        startOffset, len,
                                        bufferSize, verifyChecksum,

Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1159004&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Thu Aug 18 03:02:19 2011
@@ -0,0 +1,516 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+
+/** This is a wrapper around connection to datanode
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ *   <dd>The hdfs block, typically large (~64MB).
+ *   </dd>
+ * <dt>chunk</dt>
+ *   <dd>A block is divided into chunks, each comes with a checksum.
+ *       We want transfers to be chunk-aligned, to be able to
+ *       verify checksums.
+ *   </dd>
+ * <dt>packet</dt>
+ *   <dd>A grouping of chunks used for transport. It contains a
+ *       header, followed by checksum data, followed by real data.
+ *   </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
+ */
+@InterfaceAudience.Private
+public class RemoteBlockReader extends FSInputChecker implements BlockReader {
+
+  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+  private DataInputStream in;
+  private DataChecksum checksum;
+
+  /** offset in block of the last chunk received */
+  private long lastChunkOffset = -1;
+  private long lastChunkLen = -1;
+  private long lastSeqNo = -1;
+
+  /** offset in block where reader wants to actually read */
+  private long startOffset;
+
+  /** offset in block of of first chunk - may be less than startOffset
+      if startOffset is not chunk-aligned */
+  private final long firstChunkOffset;
+
+  private int bytesPerChecksum;
+  private int checksumSize;
+
+  /**
+   * The total number of bytes we need to transfer from the DN.
+   * This is the amount that the user has requested plus some padding
+   * at the beginning so that the read can begin on a chunk boundary.
+   */
+  private final long bytesNeededToFinish;
+
+  private boolean eos = false;
+  private boolean sentStatusCode = false;
+  
+  byte[] skipBuf = null;
+  ByteBuffer checksumBytes = null;
+  /** Amount of unread data in the current received packet */
+  int dataLeft = 0;
+  
+  /* FSInputChecker interface */
+  
+  /* same interface as inputStream java.io.InputStream#read()
+   * used by DFSInputStream#read()
+   * This violates one rule when there is a checksum error:
+   * "Read should not modify user buffer before successful read"
+   * because it first reads the data to user buffer and then checks
+   * the checksum.
+   */
+  @Override
+  public synchronized int read(byte[] buf, int off, int len) 
+                               throws IOException {
+    
+    // This has to be set here, *before* the skip, since we can
+    // hit EOS during the skip, in the case that our entire read
+    // is smaller than the checksum chunk.
+    boolean eosBefore = eos;
+
+    //for the first read, skip the extra bytes at the front.
+    if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
+      // Skip these bytes. But don't call this.skip()!
+      int toSkip = (int)(startOffset - firstChunkOffset);
+      if ( skipBuf == null ) {
+        skipBuf = new byte[bytesPerChecksum];
+      }
+      if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
+        // should never happen
+        throw new IOException("Could not skip required number of bytes");
+      }
+    }
+    
+    int nRead = super.read(buf, off, len);
+
+    // if eos was set in the previous read, send a status code to the DN
+    if (eos && !eosBefore && nRead >= 0) {
+      if (needChecksum()) {
+        sendReadResult(dnSock, Status.CHECKSUM_OK);
+      } else {
+        sendReadResult(dnSock, Status.SUCCESS);
+      }
+    }
+    return nRead;
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    /* How can we make sure we don't throw a ChecksumException, at least
+     * in majority of the cases?. This one throws. */  
+    if ( skipBuf == null ) {
+      skipBuf = new byte[bytesPerChecksum]; 
+    }
+
+    long nSkipped = 0;
+    while ( nSkipped < n ) {
+      int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
+      int ret = read(skipBuf, 0, toSkip);
+      if ( ret <= 0 ) {
+        return nSkipped;
+      }
+      nSkipped += ret;
+    }
+    return nSkipped;
+  }
+
+  @Override
+  public int read() throws IOException {
+    throw new IOException("read() is not expected to be invoked. " +
+                          "Use read(buf, off, len) instead.");
+  }
+  
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    /* Checksum errors are handled outside the BlockReader. 
+     * DFSInputStream does not always call 'seekToNewSource'. In the 
+     * case of pread(), it just tries a different replica without seeking.
+     */ 
+    return false;
+  }
+  
+  @Override
+  public void seek(long pos) throws IOException {
+    throw new IOException("Seek() is not supported in BlockInputChecker");
+  }
+
+  @Override
+  protected long getChunkPosition(long pos) {
+    throw new RuntimeException("getChunkPosition() is not supported, " +
+                               "since seek is not required");
+  }
+  
+  /**
+   * Makes sure that checksumBytes has enough capacity 
+   * and limit is set to the number of checksum bytes needed 
+   * to be read.
+   */
+  private void adjustChecksumBytes(int dataLen) {
+    int requiredSize = 
+      ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
+    if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
+      checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
+    } else {
+      checksumBytes.clear();
+    }
+    checksumBytes.limit(requiredSize);
+  }
+  
+  @Override
+  protected synchronized int readChunk(long pos, byte[] buf, int offset, 
+                                       int len, byte[] checksumBuf) 
+                                       throws IOException {
+    // Read one chunk.
+    if (eos) {
+      // Already hit EOF
+      return -1;
+    }
+    
+    // Read one DATA_CHUNK.
+    long chunkOffset = lastChunkOffset;
+    if ( lastChunkLen > 0 ) {
+      chunkOffset += lastChunkLen;
+    }
+    
+    // pos is relative to the start of the first chunk of the read.
+    // chunkOffset is relative to the start of the block.
+    // This makes sure that the read passed from FSInputChecker is the
+    // for the same chunk we expect to be reading from the DN.
+    if ( (pos + firstChunkOffset) != chunkOffset ) {
+      throw new IOException("Mismatch in pos : " + pos + " + " + 
+                            firstChunkOffset + " != " + chunkOffset);
+    }
+
+    // Read next packet if the previous packet has been read completely.
+    if (dataLeft <= 0) {
+      //Read packet headers.
+      PacketHeader header = new PacketHeader();
+      header.readFields(in);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("DFSClient readChunk got header " + header);
+      }
+
+      // Sanity check the lengths
+      if (!header.sanityCheck(lastSeqNo)) {
+           throw new IOException("BlockReader: error in packet header " +
+                                 header);
+      }
+
+      lastSeqNo = header.getSeqno();
+      dataLeft = header.getDataLen();
+      adjustChecksumBytes(header.getDataLen());
+      if (header.getDataLen() > 0) {
+        IOUtils.readFully(in, checksumBytes.array(), 0,
+                          checksumBytes.limit());
+      }
+    }
+
+    // Sanity checks
+    assert len >= bytesPerChecksum;
+    assert checksum != null;
+    assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
+
+
+    int checksumsToRead, bytesToRead;
+
+    if (checksumSize > 0) {
+
+      // How many chunks left in our packet - this is a ceiling
+      // since we may have a partial chunk at the end of the file
+      int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
+
+      // How many chunks we can fit in databuffer
+      //  - note this is a floor since we always read full chunks
+      int chunksCanFit = Math.min(len / bytesPerChecksum,
+                                  checksumBuf.length / checksumSize);
+
+      // How many chunks should we read
+      checksumsToRead = Math.min(chunksLeft, chunksCanFit);
+      // How many bytes should we actually read
+      bytesToRead = Math.min(
+        checksumsToRead * bytesPerChecksum, // full chunks
+        dataLeft); // in case we have a partial
+    } else {
+      // no checksum
+      bytesToRead = Math.min(dataLeft, len);
+      checksumsToRead = 0;
+    }
+
+    if ( bytesToRead > 0 ) {
+      // Assert we have enough space
+      assert bytesToRead <= len;
+      assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
+      assert checksumBuf.length >= checksumSize * checksumsToRead;
+      IOUtils.readFully(in, buf, offset, bytesToRead);
+      checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
+    }
+
+    dataLeft -= bytesToRead;
+    assert dataLeft >= 0;
+
+    lastChunkOffset = chunkOffset;
+    lastChunkLen = bytesToRead;
+
+    // If there's no data left in the current packet after satisfying
+    // this read, and we have satisfied the client read, we expect
+    // an empty packet header from the DN to signify this.
+    // Note that pos + bytesToRead may in fact be greater since the
+    // DN finishes off the entire last chunk.
+    if (dataLeft == 0 &&
+        pos + bytesToRead >= bytesNeededToFinish) {
+
+      // Read header
+      PacketHeader hdr = new PacketHeader();
+      hdr.readFields(in);
+
+      if (!hdr.isLastPacketInBlock() ||
+          hdr.getDataLen() != 0) {
+        throw new IOException("Expected empty end-of-read packet! Header: " +
+                              hdr);
+      }
+
+      eos = true;
+    }
+
+    if ( bytesToRead == 0 ) {
+      return -1;
+    }
+
+    return bytesToRead;
+  }
+  
+  private RemoteBlockReader(String file, String bpid, long blockId,
+      DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
+      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
+    // Path is used only for printing block and file information in debug
+    super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
+          1, verifyChecksum,
+          checksum.getChecksumSize() > 0? checksum : null, 
+          checksum.getBytesPerChecksum(),
+          checksum.getChecksumSize());
+    
+    this.dnSock = dnSock;
+    this.in = in;
+    this.checksum = checksum;
+    this.startOffset = Math.max( startOffset, 0 );
+
+    // The total number of bytes that we need to transfer from the DN is
+    // the amount that the user wants (bytesToRead), plus the padding at
+    // the beginning in order to chunk-align. Note that the DN may elect
+    // to send more than this amount if the read starts/ends mid-chunk.
+    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+
+    this.firstChunkOffset = firstChunkOffset;
+    lastChunkOffset = firstChunkOffset;
+    lastChunkLen = -1;
+
+    bytesPerChecksum = this.checksum.getBytesPerChecksum();
+    checksumSize = this.checksum.getChecksumSize();
+  }
+
+  public static RemoteBlockReader newBlockReader(Socket sock, String file,
+      ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
+      long startOffset, long len, int bufferSize) throws IOException {
+    return newBlockReader(sock, file, block, blockToken, startOffset,
+        len, bufferSize, true, "");
+  }
+
+  /**
+   * Create a new BlockReader specifically to satisfy a read.
+   * This method also sends the OP_READ_BLOCK request.
+   *
+   * @param sock  An established Socket to the DN. The BlockReader will not close it normally
+   * @param file  File location
+   * @param block  The block object
+   * @param blockToken  The block token for security
+   * @param startOffset  The read offset, relative to block head
+   * @param len  The number of bytes to read
+   * @param bufferSize  The IO buffer size (not the client buffer size)
+   * @param verifyChecksum  Whether to verify checksum
+   * @param clientName  Client name
+   * @return New BlockReader instance, or null on error.
+   */
+  public static RemoteBlockReader newBlockReader( Socket sock, String file,
+                                     ExtendedBlock block, 
+                                     Token<BlockTokenIdentifier> blockToken,
+                                     long startOffset, long len,
+                                     int bufferSize, boolean verifyChecksum,
+                                     String clientName)
+                                     throws IOException {
+    // in and out will be closed when sock is closed (by the caller)
+    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+          NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT)));
+    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
+    
+    //
+    // Get bytes in block, set streams
+    //
+
+    DataInputStream in = new DataInputStream(
+        new BufferedInputStream(NetUtils.getInputStream(sock), 
+                                bufferSize));
+    
+    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+        vintPrefixed(in));
+    checkSuccess(status, sock, block, file);
+    DataChecksum checksum = DataChecksum.newDataChecksum( in );
+    //Warning when we get CHECKSUM_NULL?
+    
+    // Read the first chunk offset.
+    long firstChunkOffset = in.readLong();
+    
+    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+        firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
+      throw new IOException("BlockReader: error in first chunk offset (" +
+                            firstChunkOffset + ") startOffset is " + 
+                            startOffset + " for file " + file);
+    }
+
+    return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
+        in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
+  }
+
+  private static void checkSuccess(
+      BlockOpResponseProto status, Socket sock,
+      ExtendedBlock block, String file)
+      throws IOException {
+    if (status.getStatus() != Status.SUCCESS) {
+      if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+        throw new InvalidBlockTokenException(
+            "Got access token error for OP_READ_BLOCK, self="
+                + sock.getLocalSocketAddress() + ", remote="
+                + sock.getRemoteSocketAddress() + ", for file " + file
+                + ", for pool " + block.getBlockPoolId() + " block " 
+                + block.getBlockId() + "_" + block.getGenerationStamp());
+      } else {
+        throw new IOException("Got error for OP_READ_BLOCK, self="
+            + sock.getLocalSocketAddress() + ", remote="
+            + sock.getRemoteSocketAddress() + ", for file " + file
+            + ", for pool " + block.getBlockPoolId() + " block " 
+            + block.getBlockId() + "_" + block.getGenerationStamp());
+      }
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    startOffset = -1;
+    checksum = null;
+    if (dnSock != null) {
+      dnSock.close();
+    }
+
+    // in will be closed when its Socket is closed.
+  }
+  
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return readFully(this, buf, offset, len);
+  }
+
+  @Override
+  public Socket takeSocket() {
+    assert hasSentStatusCode() :
+      "BlockReader shouldn't give back sockets mid-read";
+    Socket res = dnSock;
+    dnSock = null;
+    return res;
+  }
+
+  @Override
+  public boolean hasSentStatusCode() {
+    return sentStatusCode;
+  }
+
+  /**
+   * When the reader reaches end of the read, it sends a status response
+   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+   * closing our connection (which we will re-open), but won't affect
+   * data correctness.
+   */
+  void sendReadResult(Socket sock, Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + sock;
+    try {
+      OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
+      
+      ClientReadStatusProto.newBuilder()
+        .setStatus(statusCode)
+        .build()
+        .writeDelimitedTo(out);
+
+      out.flush();
+      sentStatusCode = true;
+    } catch (IOException e) {
+      // It's ok not to be able to send this. But something is probably wrong.
+      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+               sock.getInetAddress() + ": " + e.getMessage());
+    }
+  }
+  
+  /**
+   * File name to print when accessing a block directly (from servlets)
+   * @param s Address of the block location
+   * @param poolId Block pool ID of the block
+   * @param blockId Block ID of the block
+   * @return string that has a file name for debug purposes
+   */
+  public static String getFileName(final InetSocketAddress s,
+      final String poolId, final long blockId) {
+    return s.toString() + ":" + poolId + ":" + blockId;
+  }
+}

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1159004&r1=1159003&r2=1159004&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Thu Aug 18 03:02:19 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -195,8 +196,8 @@ public class JspHelper {
       // Use the block name for file name. 
     int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
         DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
-    String file = BlockReader.getFileName(addr, poolId, blockId);
-    BlockReader blockReader = BlockReader.newBlockReader(s, file,
+    String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
+    BlockReader blockReader = BlockReaderFactory.newBlockReader(s, file,
         new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
         offsetIntoBlock, amtToRead, bufferSize);
         

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1159004&r1=1159003&r2=1159004&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Thu Aug 18 03:02:19 2011
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -504,9 +505,9 @@ public class NamenodeFsck {
         s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
         s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
         
-        String file = BlockReader.getFileName(targetAddr, block.getBlockPoolId(),
+        String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
             block.getBlockId());
-        blockReader = BlockReader.newBlockReader(s, file, block, lblock
+        blockReader = BlockReaderFactory.newBlockReader(s, file, block, lblock
             .getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096));
         
       }  catch (IOException ex) {

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1159004&r1=1159003&r2=1159004&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Thu Aug 18 03:02:19 2011
@@ -143,7 +143,7 @@ public class BlockReaderTestUtil {
     sock.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
     sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
 
-    return BlockReader.newBlockReader(
+    return BlockReaderFactory.newBlockReader(
       sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
       testBlock.getBlockToken(), 
       offset, lenToRead,

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1159004&r1=1159003&r2=1159004&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Thu Aug 18 03:02:19 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs;
 
 import java.util.List;
 
+import org.apache.hadoop.hdfs.RemoteBlockReader;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.fs.Path;
@@ -52,7 +54,8 @@ public class TestClientBlockVerification
    */
   @Test
   public void testBlockVerification() throws Exception {
-    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+    RemoteBlockReader reader = (RemoteBlockReader)spy(
+        util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
     verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
     reader.close();
@@ -63,7 +66,8 @@ public class TestClientBlockVerification
    */
   @Test
   public void testIncompleteRead() throws Exception {
-    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+    RemoteBlockReader reader = (RemoteBlockReader)spy(
+        util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
 
     // We asked the blockreader for the whole file, and only read
@@ -80,7 +84,8 @@ public class TestClientBlockVerification
   @Test
   public void testCompletePartialRead() throws Exception {
     // Ask for half the file
-    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
+    RemoteBlockReader reader = (RemoteBlockReader)spy(
+        util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
     // And read half the file
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
     verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
@@ -99,7 +104,8 @@ public class TestClientBlockVerification
       for (int length : lengths) {
         DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
                            " len=" + length);
-        BlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length));
+        RemoteBlockReader reader = (RemoteBlockReader)spy(
+            util.getBlockReader(testBlock, startOffset, length));
         util.readAndCheckEOS(reader, length, true);
         verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
         reader.close();

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java?rev=1159004&r1=1159003&r2=1159004&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java Thu Aug 18 03:02:19 2011
@@ -28,6 +28,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.SocketCache;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -72,13 +76,13 @@ public class TestConnCache {
    * It verifies that all invocation to DFSInputStream.getBlockReader()
    * use the same socket.
    */
-  private class MockGetBlockReader implements Answer<BlockReader> {
-    public BlockReader reader = null;
+  private class MockGetBlockReader implements Answer<RemoteBlockReader> {
+    public RemoteBlockReader reader = null;
     private Socket sock = null;
 
-    public BlockReader answer(InvocationOnMock invocation) throws Throwable {
-      BlockReader prevReader = reader;
-      reader = (BlockReader) invocation.callRealMethod();
+    public RemoteBlockReader answer(InvocationOnMock invocation) throws Throwable {
+      RemoteBlockReader prevReader = reader;
+      reader = (RemoteBlockReader) invocation.callRealMethod();
       if (sock == null) {
         sock = reader.dnSock;
       } else if (prevReader != null && prevReader.hasSentStatusCode()) {

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1159004&r1=1159003&r2=1159004&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Thu Aug 18 03:02:19 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -139,9 +140,9 @@ public class TestBlockTokenWithDFS {
       s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
       s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
 
-      String file = BlockReader.getFileName(targetAddr, 
+      String file = BlockReaderFactory.getFileName(targetAddr, 
           "test-blockpoolid", block.getBlockId());
-      blockReader = BlockReader.newBlockReader(s, file, block, 
+      blockReader = BlockReaderFactory.newBlockReader(s, file, block, 
           lblock.getBlockToken(), 0, -1, 
           conf.getInt("io.file.buffer.size", 4096));
 

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1159004&r1=1159003&r2=1159004&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Thu Aug 18 03:02:19 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -267,11 +268,11 @@ public class TestDataNodeVolumeFailure {
     s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
     s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
 
-    String file = BlockReader.getFileName(targetAddr, 
+    String file = BlockReaderFactory.getFileName(targetAddr, 
         "test-blockpoolid",
         block.getBlockId());
     BlockReader blockReader = 
-      BlockReader.newBlockReader(s, file, block, lblock
+      BlockReaderFactory.newBlockReader(s, file, block, lblock
         .getBlockToken(), 0, -1, 4096);
 
     // nothing - if it fails - it will throw and exception