You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/04 23:42:09 UTC

[04/50] [abbrv] hadoop git commit: HDFS-8990. Move RemoteBlockReader to hdfs-client module. Contributed by Mingliang Liu.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
deleted file mode 100644
index 2a77cb6..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ /dev/null
@@ -1,477 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.util.EnumSet;
-import java.util.UUID;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.htrace.Sampler;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * This is a wrapper around connection to datanode
- * and understands checksum, offset etc.
- *
- * Terminology:
- * <dl>
- * <dt>block</dt>
- *   <dd>The hdfs block, typically large (~64MB).
- *   </dd>
- * <dt>chunk</dt>
- *   <dd>A block is divided into chunks, each comes with a checksum.
- *       We want transfers to be chunk-aligned, to be able to
- *       verify checksums.
- *   </dd>
- * <dt>packet</dt>
- *   <dd>A grouping of chunks used for transport. It contains a
- *       header, followed by checksum data, followed by real data.
- *   </dd>
- * </dl>
- * Please see DataNode for the RPC specification.
- *
- * This is a new implementation introduced in Hadoop 0.23 which
- * is more efficient and simpler than the older BlockReader
- * implementation. It should be renamed to RemoteBlockReader
- * once we are confident in it.
- */
-@InterfaceAudience.Private
-public class RemoteBlockReader2  implements BlockReader {
-
-  static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
-  
-  final private Peer peer;
-  final private DatanodeID datanodeID;
-  final private PeerCache peerCache;
-  final private long blockId;
-  private final ReadableByteChannel in;
-  private DataChecksum checksum;
-  
-  private final PacketReceiver packetReceiver = new PacketReceiver(true);
-  private ByteBuffer curDataSlice = null;
-
-  /** offset in block of the last chunk received */
-  private long lastSeqNo = -1;
-
-  /** offset in block where reader wants to actually read */
-  private long startOffset;
-  private final String filename;
-
-  private final int bytesPerChecksum;
-  private final int checksumSize;
-
-  /**
-   * The total number of bytes we need to transfer from the DN.
-   * This is the amount that the user has requested plus some padding
-   * at the beginning so that the read can begin on a chunk boundary.
-   */
-  private long bytesNeededToFinish;
-
-  /**
-   * True if we are reading from a local DataNode.
-   */
-  private final boolean isLocal;
-
-  private final boolean verifyChecksum;
-
-  private boolean sentStatusCode = false;
-
-  @VisibleForTesting
-  public Peer getPeer() {
-    return peer;
-  }
-  
-  @Override
-  public synchronized int read(byte[] buf, int off, int len) 
-                               throws IOException {
-
-    UUID randomId = null;
-    if (LOG.isTraceEnabled()) {
-      randomId = UUID.randomUUID();
-      LOG.trace(String.format("Starting read #%s file %s from datanode %s",
-        randomId.toString(), this.filename,
-        this.datanodeID.getHostName()));
-    }
-
-    if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
-      TraceScope scope = Trace.startSpan(
-          "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
-      try {
-        readNextPacket();
-      } finally {
-        scope.close();
-      }
-    }
-
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(String.format("Finishing read #" + randomId));
-    }
-
-    if (curDataSlice.remaining() == 0) {
-      // we're at EOF now
-      return -1;
-    }
-    
-    int nRead = Math.min(curDataSlice.remaining(), len);
-    curDataSlice.get(buf, off, nRead);
-    
-    return nRead;
-  }
-
-
-  @Override
-  public synchronized int read(ByteBuffer buf) throws IOException {
-    if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
-      TraceScope scope = Trace.startSpan(
-          "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
-      try {
-        readNextPacket();
-      } finally {
-        scope.close();
-      }
-    }
-    if (curDataSlice.remaining() == 0) {
-      // we're at EOF now
-      return -1;
-    }
-
-    int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
-    ByteBuffer writeSlice = curDataSlice.duplicate();
-    writeSlice.limit(writeSlice.position() + nRead);
-    buf.put(writeSlice);
-    curDataSlice.position(writeSlice.position());
-
-    return nRead;
-  }
-
-  private void readNextPacket() throws IOException {
-    //Read packet headers.
-    packetReceiver.receiveNextPacket(in);
-
-    PacketHeader curHeader = packetReceiver.getHeader();
-    curDataSlice = packetReceiver.getDataSlice();
-    assert curDataSlice.capacity() == curHeader.getDataLen();
-    
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("DFSClient readNextPacket got header " + curHeader);
-    }
-
-    // Sanity check the lengths
-    if (!curHeader.sanityCheck(lastSeqNo)) {
-         throw new IOException("BlockReader: error in packet header " +
-                               curHeader);
-    }
-    
-    if (curHeader.getDataLen() > 0) {
-      int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
-      int checksumsLen = chunks * checksumSize;
-
-      assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
-        "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() + 
-          " checksumsLen=" + checksumsLen;
-      
-      lastSeqNo = curHeader.getSeqno();
-      if (verifyChecksum && curDataSlice.remaining() > 0) {
-        // N.B.: the checksum error offset reported here is actually
-        // relative to the start of the block, not the start of the file.
-        // This is slightly misleading, but preserves the behavior from
-        // the older BlockReader.
-        checksum.verifyChunkedSums(curDataSlice,
-            packetReceiver.getChecksumSlice(),
-            filename, curHeader.getOffsetInBlock());
-      }
-      bytesNeededToFinish -= curHeader.getDataLen();
-    }    
-    
-    // First packet will include some data prior to the first byte
-    // the user requested. Skip it.
-    if (curHeader.getOffsetInBlock() < startOffset) {
-      int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
-      curDataSlice.position(newPos);
-    }
-
-    // If we've now satisfied the whole client read, read one last packet
-    // header, which should be empty
-    if (bytesNeededToFinish <= 0) {
-      readTrailingEmptyPacket();
-      if (verifyChecksum) {
-        sendReadResult(Status.CHECKSUM_OK);
-      } else {
-        sendReadResult(Status.SUCCESS);
-      }
-    }
-  }
-  
-  @Override
-  public synchronized long skip(long n) throws IOException {
-    /* How can we make sure we don't throw a ChecksumException, at least
-     * in majority of the cases?. This one throws. */
-    long skipped = 0;
-    while (skipped < n) {
-      long needToSkip = n - skipped;
-      if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
-        readNextPacket();
-      }
-      if (curDataSlice.remaining() == 0) {
-        // we're at EOF now
-        break;
-      }
-
-      int skip = (int)Math.min(curDataSlice.remaining(), needToSkip);
-      curDataSlice.position(curDataSlice.position() + skip);
-      skipped += skip;
-    }
-    return skipped;
-  }
-
-  private void readTrailingEmptyPacket() throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Reading empty packet at end of read");
-    }
-    
-    packetReceiver.receiveNextPacket(in);
-
-    PacketHeader trailer = packetReceiver.getHeader();
-    if (!trailer.isLastPacketInBlock() ||
-       trailer.getDataLen() != 0) {
-      throw new IOException("Expected empty end-of-read packet! Header: " +
-                            trailer);
-    }
-  }
-
-  protected RemoteBlockReader2(String file, String bpid, long blockId,
-      DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
-      DatanodeID datanodeID, PeerCache peerCache) {
-    this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
-        createSocketAddr(datanodeID.getXferAddr()));
-    // Path is used only for printing block and file information in debug
-    this.peer = peer;
-    this.datanodeID = datanodeID;
-    this.in = peer.getInputStreamChannel();
-    this.checksum = checksum;
-    this.verifyChecksum = verifyChecksum;
-    this.startOffset = Math.max( startOffset, 0 );
-    this.filename = file;
-    this.peerCache = peerCache;
-    this.blockId = blockId;
-
-    // The total number of bytes that we need to transfer from the DN is
-    // the amount that the user wants (bytesToRead), plus the padding at
-    // the beginning in order to chunk-align. Note that the DN may elect
-    // to send more than this amount if the read starts/ends mid-chunk.
-    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-  }
-
-
-  @Override
-  public synchronized void close() throws IOException {
-    packetReceiver.close();
-    startOffset = -1;
-    checksum = null;
-    if (peerCache != null && sentStatusCode) {
-      peerCache.put(datanodeID, peer);
-    } else {
-      peer.close();
-    }
-
-    // in will be closed when its Socket is closed.
-  }
-  
-  /**
-   * When the reader reaches end of the read, it sends a status response
-   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
-   * closing our connection (which we will re-open), but won't affect
-   * data correctness.
-   */
-  void sendReadResult(Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + peer;
-    try {
-      writeReadResult(peer.getOutputStream(), statusCode);
-      sentStatusCode = true;
-    } catch (IOException e) {
-      // It's ok not to be able to send this. But something is probably wrong.
-      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               peer.getRemoteAddressString() + ": " + e.getMessage());
-    }
-  }
-
-  /**
-   * Serialize the actual read result on the wire.
-   */
-  static void writeReadResult(OutputStream out, Status statusCode)
-      throws IOException {
-    
-    ClientReadStatusProto.newBuilder()
-      .setStatus(statusCode)
-      .build()
-      .writeDelimitedTo(out);
-
-    out.flush();
-  }
-  
-  /**
-   * File name to print when accessing a block directly (from servlets)
-   * @param s Address of the block location
-   * @param poolId Block pool ID of the block
-   * @param blockId Block ID of the block
-   * @return string that has a file name for debug purposes
-   */
-  public static String getFileName(final InetSocketAddress s,
-      final String poolId, final long blockId) {
-    return s.toString() + ":" + poolId + ":" + blockId;
-  }
-
-  @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return BlockReaderUtil.readAll(this, buf, offset, len);
-  }
-
-  @Override
-  public void readFully(byte[] buf, int off, int len) throws IOException {
-    BlockReaderUtil.readFully(this, buf, off, len);
-  }
-  
-  /**
-   * Create a new BlockReader specifically to satisfy a read.
-   * This method also sends the OP_READ_BLOCK request.
-   *
-   * @param file  File location
-   * @param block  The block object
-   * @param blockToken  The block token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
-   * @param peer  The Peer to use
-   * @param datanodeID  The DatanodeID this peer is connected to
-   * @return New BlockReader instance, or null on error.
-   */
-  public static BlockReader newBlockReader(String file,
-                                     ExtendedBlock block,
-                                     Token<BlockTokenIdentifier> blockToken,
-                                     long startOffset, long len,
-                                     boolean verifyChecksum,
-                                     String clientName,
-                                     Peer peer, DatanodeID datanodeID,
-                                     PeerCache peerCache,
-                                     CachingStrategy cachingStrategy) throws IOException {
-    // in and out will be closed when sock is closed (by the caller)
-    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-          peer.getOutputStream()));
-    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
-        verifyChecksum, cachingStrategy);
-
-    //
-    // Get bytes in block
-    //
-    DataInputStream in = new DataInputStream(peer.getInputStream());
-
-    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
-        PBHelperClient.vintPrefixed(in));
-    checkSuccess(status, peer, block, file);
-    ReadOpChecksumInfoProto checksumInfo =
-      status.getReadOpChecksumInfo();
-    DataChecksum checksum = DataTransferProtoUtil.fromProto(
-        checksumInfo.getChecksum());
-    //Warning when we get CHECKSUM_NULL?
-
-    // Read the first chunk offset.
-    long firstChunkOffset = checksumInfo.getChunkOffset();
-
-    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
-        firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
-      throw new IOException("BlockReader: error in first chunk offset (" +
-                            firstChunkOffset + ") startOffset is " +
-                            startOffset + " for file " + file);
-    }
-
-    return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
-        checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer,
-        datanodeID, peerCache);
-  }
-
-  static void checkSuccess(
-      BlockOpResponseProto status, Peer peer,
-      ExtendedBlock block, String file)
-      throws IOException {
-    String logInfo = "for OP_READ_BLOCK"
-      + ", self=" + peer.getLocalAddressString()
-      + ", remote=" + peer.getRemoteAddressString()
-      + ", for file " + file
-      + ", for pool " + block.getBlockPoolId()
-      + " block " + block.getBlockId() + "_" + block.getGenerationStamp();
-    DataTransferProtoUtil.checkBlockOpStatus(status, logInfo);
-  }
-  
-  @Override
-  public int available() throws IOException {
-    // An optimistic estimate of how much data is available
-    // to us without doing network I/O.
-    return DFSClient.TCP_WINDOW_SIZE;
-  }
-  
-  @Override
-  public boolean isLocal() {
-    return isLocal;
-  }
-  
-  @Override
-  public boolean isShortCircuit() {
-    return false;
-  }
-
-  @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
deleted file mode 100644
index c9966a7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
-import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
-
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Shorts;
-import com.google.common.primitives.Ints;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-/**
- * Header data for each packet that goes through the read/write pipelines.
- * Includes all of the information about the packet, excluding checksums and
- * actual data.
- * 
- * This data includes:
- *  - the offset in bytes into the HDFS block of the data in this packet
- *  - the sequence number of this packet in the pipeline
- *  - whether or not this is the last packet in the pipeline
- *  - the length of the data in this packet
- *  - whether or not this packet should be synced by the DNs.
- *  
- * When serialized, this header is written out as a protocol buffer, preceded
- * by a 4-byte integer representing the full packet length, and a 2-byte short
- * representing the header length.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class PacketHeader {
-  private static final int MAX_PROTO_SIZE = 
-    PacketHeaderProto.newBuilder()
-      .setOffsetInBlock(0)
-      .setSeqno(0)
-      .setLastPacketInBlock(false)
-      .setDataLen(0)
-      .setSyncBlock(false)
-      .build().getSerializedSize();
-  public static final int PKT_LENGTHS_LEN =
-      Ints.BYTES + Shorts.BYTES;
-  public static final int PKT_MAX_HEADER_LEN =
-      PKT_LENGTHS_LEN + MAX_PROTO_SIZE;
-
-  private int packetLen;
-  private PacketHeaderProto proto;
-
-  public PacketHeader() {
-  }
-
-  public PacketHeader(int packetLen, long offsetInBlock, long seqno,
-                      boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
-    this.packetLen = packetLen;
-    Preconditions.checkArgument(packetLen >= Ints.BYTES,
-        "packet len %s should always be at least 4 bytes",
-        packetLen);
-    
-    PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder()
-      .setOffsetInBlock(offsetInBlock)
-      .setSeqno(seqno)
-      .setLastPacketInBlock(lastPacketInBlock)
-      .setDataLen(dataLen);
-      
-    if (syncBlock) {
-      // Only set syncBlock if it is specified.
-      // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721
-      // because it changes the length of the packet header, and BlockReceiver
-      // in that version did not support variable-length headers.
-      builder.setSyncBlock(syncBlock);
-    }
-      
-    proto = builder.build();
-  }
-
-  public int getDataLen() {
-    return proto.getDataLen();
-  }
-
-  public boolean isLastPacketInBlock() {
-    return proto.getLastPacketInBlock();
-  }
-
-  public long getSeqno() {
-    return proto.getSeqno();
-  }
-
-  public long getOffsetInBlock() {
-    return proto.getOffsetInBlock();
-  }
-
-  public int getPacketLen() {
-    return packetLen;
-  }
-
-  public boolean getSyncBlock() {
-    return proto.getSyncBlock();
-  }
-
-  @Override
-  public String toString() {
-    return "PacketHeader with packetLen=" + packetLen +
-      " header data: " + 
-      proto.toString();
-  }
-  
-  public void setFieldsFromData(
-      int packetLen, byte[] headerData) throws InvalidProtocolBufferException {
-    this.packetLen = packetLen;
-    proto = PacketHeaderProto.parseFrom(headerData);
-  }
-  
-  public void readFields(ByteBuffer buf) throws IOException {
-    packetLen = buf.getInt();
-    short protoLen = buf.getShort();
-    byte[] data = new byte[protoLen];
-    buf.get(data);
-    proto = PacketHeaderProto.parseFrom(data);
-  }
-  
-  public void readFields(DataInputStream in) throws IOException {
-    this.packetLen = in.readInt();
-    short protoLen = in.readShort();
-    byte[] data = new byte[protoLen];
-    in.readFully(data);
-    proto = PacketHeaderProto.parseFrom(data);
-  }
-
-  /**
-   * @return the number of bytes necessary to write out this header,
-   * including the length-prefixing of the payload and header
-   */
-  public int getSerializedSize() {
-    return PKT_LENGTHS_LEN + proto.getSerializedSize();
-  }
-
-  /**
-   * Write the header into the buffer.
-   * This requires that PKT_HEADER_LEN bytes are available.
-   */
-  public void putInBuffer(final ByteBuffer buf) {
-    assert proto.getSerializedSize() <= MAX_PROTO_SIZE
-      : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
-    try {
-      buf.putInt(packetLen);
-      buf.putShort((short) proto.getSerializedSize());
-      proto.writeTo(new ByteBufferOutputStream(buf));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-  
-  public void write(DataOutputStream out) throws IOException {
-    assert proto.getSerializedSize() <= MAX_PROTO_SIZE
-    : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
-    out.writeInt(packetLen);
-    out.writeShort(proto.getSerializedSize());
-    proto.writeTo(out);
-  }
-  
-  public byte[] getBytes() {
-    ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
-    putInBuffer(buf);
-    return buf.array();
-  }
-
-  /**
-   * Perform a sanity check on the packet, returning true if it is sane.
-   * @param lastSeqNo the previous sequence number received - we expect the current
-   * sequence number to be larger by 1.
-   */
-  public boolean sanityCheck(long lastSeqNo) {
-    // We should only have a non-positive data length for the last packet
-    if (proto.getDataLen() <= 0 && !proto.getLastPacketInBlock()) return false;
-    // The last packet should not contain data
-    if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
-    // Seqnos should always increase by 1 with each packet received
-    if (proto.getSeqno() != lastSeqNo + 1) return false;
-    return true;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (!(o instanceof PacketHeader)) return false;
-    PacketHeader other = (PacketHeader)o;
-    return this.proto.equals(other.proto);
-  }
-
-  @Override
-  public int hashCode() {
-    return (int)proto.getSeqno();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
deleted file mode 100644
index 3045a13..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.io.Closeable;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.util.DirectBufferPool;
-import org.apache.hadoop.io.IOUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
-
-/**
- * Class to handle reading packets one-at-a-time from the wire.
- * These packets are used both for reading and writing data to/from
- * DataNodes.
- */
-@InterfaceAudience.Private
-public class PacketReceiver implements Closeable {
-
-  /**
-   * The max size of any single packet. This prevents OOMEs when
-   * invalid data is sent.
-   */
-  private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
-
-  static final Log LOG = LogFactory.getLog(PacketReceiver.class);
-  
-  private static final DirectBufferPool bufferPool = new DirectBufferPool();
-  private final boolean useDirectBuffers;
-
-  /**
-   * The entirety of the most recently read packet.
-   * The first PKT_LENGTHS_LEN bytes of this buffer are the
-   * length prefixes.
-   */
-  private ByteBuffer curPacketBuf = null;
-  
-  /**
-   * A slice of {@link #curPacketBuf} which contains just the checksums.
-   */
-  private ByteBuffer curChecksumSlice = null;
-  
-  /**
-   * A slice of {@link #curPacketBuf} which contains just the data.
-   */
-  private ByteBuffer curDataSlice = null;
-
-  /**
-   * The packet header of the most recently read packet.
-   */
-  private PacketHeader curHeader;
-  
-  public PacketReceiver(boolean useDirectBuffers) {
-    this.useDirectBuffers = useDirectBuffers;
-    reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN);
-  }
-
-  public PacketHeader getHeader() {
-    return curHeader;
-  }
-
-  public ByteBuffer getDataSlice() {
-    return curDataSlice;
-  }
-  
-  public ByteBuffer getChecksumSlice() {
-    return curChecksumSlice;
-  }
-
-  /**
-   * Reads all of the data for the next packet into the appropriate buffers.
-   * 
-   * The data slice and checksum slice members will be set to point to the
-   * user data and corresponding checksums. The header will be parsed and
-   * set.
-   */
-  public void receiveNextPacket(ReadableByteChannel in) throws IOException {
-    doRead(in, null);
-  }
-
-  /**
-   * @see #receiveNextPacket(ReadableByteChannel)
-   */
-  public void receiveNextPacket(InputStream in) throws IOException {
-    doRead(null, in);
-  }
-
-  private void doRead(ReadableByteChannel ch, InputStream in)
-      throws IOException {
-    // Each packet looks like:
-    //   PLEN    HLEN      HEADER     CHECKSUMS  DATA
-    //   32-bit  16-bit   <protobuf>  <variable length>
-    //
-    // PLEN:      Payload length
-    //            = length(PLEN) + length(CHECKSUMS) + length(DATA)
-    //            This length includes its own encoded length in
-    //            the sum for historical reasons.
-    //
-    // HLEN:      Header length
-    //            = length(HEADER)
-    //
-    // HEADER:    the actual packet header fields, encoded in protobuf
-    // CHECKSUMS: the crcs for the data chunk. May be missing if
-    //            checksums were not requested
-    // DATA       the actual block data
-    Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
-
-    curPacketBuf.clear();
-    curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN);
-    doReadFully(ch, in, curPacketBuf);
-    curPacketBuf.flip();
-    int payloadLen = curPacketBuf.getInt();
-    
-    if (payloadLen < Ints.BYTES) {
-      // The "payload length" includes its own length. Therefore it
-      // should never be less than 4 bytes
-      throw new IOException("Invalid payload length " +
-          payloadLen);
-    }
-    int dataPlusChecksumLen = payloadLen - Ints.BYTES;
-    int headerLen = curPacketBuf.getShort();
-    if (headerLen < 0) {
-      throw new IOException("Invalid header length " + headerLen);
-    }
-    
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("readNextPacket: dataPlusChecksumLen = " + dataPlusChecksumLen +
-          " headerLen = " + headerLen);
-    }
-    
-    // Sanity check the buffer size so we don't allocate too much memory
-    // and OOME.
-    int totalLen = payloadLen + headerLen;
-    if (totalLen < 0 || totalLen > MAX_PACKET_SIZE) {
-      throw new IOException("Incorrect value for packet payload size: " +
-                            payloadLen);
-    }
-
-    // Make sure we have space for the whole packet, and
-    // read it.
-    reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN +
-        dataPlusChecksumLen + headerLen);
-    curPacketBuf.clear();
-    curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
-    curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN +
-        dataPlusChecksumLen + headerLen);
-    doReadFully(ch, in, curPacketBuf);
-    curPacketBuf.flip();
-    curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
-
-    // Extract the header from the front of the buffer (after the length prefixes)
-    byte[] headerBuf = new byte[headerLen];
-    curPacketBuf.get(headerBuf);
-    if (curHeader == null) {
-      curHeader = new PacketHeader();
-    }
-    curHeader.setFieldsFromData(payloadLen, headerBuf);
-    
-    // Compute the sub-slices of the packet
-    int checksumLen = dataPlusChecksumLen - curHeader.getDataLen();
-    if (checksumLen < 0) {
-      throw new IOException("Invalid packet: data length in packet header " + 
-          "exceeds data length received. dataPlusChecksumLen=" +
-          dataPlusChecksumLen + " header: " + curHeader); 
-    }
-    
-    reslicePacket(headerLen, checksumLen, curHeader.getDataLen());
-  }
-  
-  /**
-   * Rewrite the last-read packet on the wire to the given output stream.
-   */
-  public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
-    Preconditions.checkState(!useDirectBuffers,
-        "Currently only supported for non-direct buffers");
-    mirrorOut.write(curPacketBuf.array(),
-        curPacketBuf.arrayOffset(),
-        curPacketBuf.remaining());
-  }
-
-  
-  private static void doReadFully(ReadableByteChannel ch, InputStream in,
-      ByteBuffer buf) throws IOException {
-    if (ch != null) {
-      readChannelFully(ch, buf);
-    } else {
-      Preconditions.checkState(!buf.isDirect(),
-          "Must not use direct buffers with InputStream API");
-      IOUtils.readFully(in, buf.array(),
-          buf.arrayOffset() + buf.position(),
-          buf.remaining());
-      buf.position(buf.position() + buf.remaining());
-    }
-  }
-
-  private void reslicePacket(
-      int headerLen, int checksumsLen, int dataLen) {
-    // Packet structure (refer to doRead() for details):
-    //   PLEN    HLEN      HEADER     CHECKSUMS  DATA
-    //   32-bit  16-bit   <protobuf>  <variable length>
-    //   |--- lenThroughHeader ----|
-    //   |----------- lenThroughChecksums   ----|
-    //   |------------------- lenThroughData    ------| 
-    int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen;
-    int lenThroughChecksums = lenThroughHeader + checksumsLen;
-    int lenThroughData = lenThroughChecksums + dataLen;
-
-    assert dataLen >= 0 : "invalid datalen: " + dataLen;
-    assert curPacketBuf.position() == lenThroughHeader;
-    assert curPacketBuf.limit() == lenThroughData :
-      "headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
-      " rem=" + curPacketBuf.remaining();
-
-    // Slice the checksums.
-    curPacketBuf.position(lenThroughHeader);
-    curPacketBuf.limit(lenThroughChecksums);
-    curChecksumSlice = curPacketBuf.slice();
-
-    // Slice the data.
-    curPacketBuf.position(lenThroughChecksums);
-    curPacketBuf.limit(lenThroughData);
-    curDataSlice = curPacketBuf.slice();
-    
-    // Reset buffer to point to the entirety of the packet (including
-    // length prefixes)
-    curPacketBuf.position(0);
-    curPacketBuf.limit(lenThroughData);
-  }
-
-  
-  private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
-      throws IOException {
-    while (buf.remaining() > 0) {
-      int n = ch.read(buf);
-      if (n < 0) {
-        throw new IOException("Premature EOF reading from " + ch);
-      }
-    }
-  }
-  
-  private void reallocPacketBuf(int atLeastCapacity) {
-    // Realloc the buffer if this packet is longer than the previous
-    // one.
-    if (curPacketBuf == null ||
-        curPacketBuf.capacity() < atLeastCapacity) {
-      ByteBuffer newBuf;
-      if (useDirectBuffers) {
-        newBuf = bufferPool.getBuffer(atLeastCapacity);
-      } else {
-        newBuf = ByteBuffer.allocate(atLeastCapacity);
-      }
-      // If reallocing an existing buffer, copy the old packet length
-      // prefixes over
-      if (curPacketBuf != null) {
-        curPacketBuf.flip();
-        newBuf.put(curPacketBuf);
-      }
-      
-      returnPacketBufToPool();
-      curPacketBuf = newBuf;
-    }
-  }
-  
-  private void returnPacketBufToPool() {
-    if (curPacketBuf != null && curPacketBuf.isDirect()) {
-      bufferPool.returnBuffer(curPacketBuf);
-      curPacketBuf = null;
-    }
-  }
-
-  @Override // Closeable
-  public void close() {
-    returnPacketBufToPool();
-  }
-  
-  @Override
-  protected void finalize() throws Throwable {
-    try {
-      // just in case it didn't get closed, we
-      // may as well still try to return the buffer
-      returnPacketBufToPool();
-    } finally {
-      super.finalize();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
deleted file mode 100644
index 31d4dcc..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.util;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * OutputStream that writes into a {@link ByteBuffer}.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Stable
-public class ByteBufferOutputStream extends OutputStream {
-
-  private final ByteBuffer buf;
-
-  public ByteBufferOutputStream(ByteBuffer buf) {
-    this.buf = buf;
-  }
-
-  @Override
-  public void write(int b) throws IOException {
-    buf.put((byte)b);
-  }
-
-  @Override
-  public void write(byte[] b, int off, int len) throws IOException {
-    buf.put(b, off, len);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
index 8dd3d6f..5ff343a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
@@ -24,10 +24,10 @@ import static org.mockito.Mockito.verify;
 
 import java.util.List;
 
-import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -41,7 +41,7 @@ public class TestClientBlockVerification {
   static LocatedBlock testBlock = null;
 
   static {
-    ((Log4JLogger)RemoteBlockReader2.LOG).getLogger().setLevel(Level.ALL);
+    GenericTestUtils.setLogLevel(RemoteBlockReader2.LOG, Level.ALL);
   }
   @BeforeClass
   public static void setupCluster() throws Exception {