You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/04 23:42:09 UTC
[04/50] [abbrv] hadoop git commit: HDFS-8990. Move RemoteBlockReader
to hdfs-client module. Contributed by Mingliang Liu.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
deleted file mode 100644
index 2a77cb6..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ /dev/null
@@ -1,477 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.util.EnumSet;
-import java.util.UUID;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.htrace.Sampler;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * This is a wrapper around connection to datanode
- * and understands checksum, offset etc.
- *
- * Terminology:
- * <dl>
- * <dt>block</dt>
- * <dd>The hdfs block, typically large (~64MB).
- * </dd>
- * <dt>chunk</dt>
- * <dd>A block is divided into chunks, each comes with a checksum.
- * We want transfers to be chunk-aligned, to be able to
- * verify checksums.
- * </dd>
- * <dt>packet</dt>
- * <dd>A grouping of chunks used for transport. It contains a
- * header, followed by checksum data, followed by real data.
- * </dd>
- * </dl>
- * Please see DataNode for the RPC specification.
- *
- * This is a new implementation introduced in Hadoop 0.23 which
- * is more efficient and simpler than the older BlockReader
- * implementation. It should be renamed to RemoteBlockReader
- * once we are confident in it.
- */
-@InterfaceAudience.Private
-public class RemoteBlockReader2 implements BlockReader {
-
- static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
-
- final private Peer peer;
- final private DatanodeID datanodeID;
- final private PeerCache peerCache;
- final private long blockId;
- private final ReadableByteChannel in;
- private DataChecksum checksum;
-
- private final PacketReceiver packetReceiver = new PacketReceiver(true);
- private ByteBuffer curDataSlice = null;
-
- /** offset in block of the last chunk received */
- private long lastSeqNo = -1;
-
- /** offset in block where reader wants to actually read */
- private long startOffset;
- private final String filename;
-
- private final int bytesPerChecksum;
- private final int checksumSize;
-
- /**
- * The total number of bytes we need to transfer from the DN.
- * This is the amount that the user has requested plus some padding
- * at the beginning so that the read can begin on a chunk boundary.
- */
- private long bytesNeededToFinish;
-
- /**
- * True if we are reading from a local DataNode.
- */
- private final boolean isLocal;
-
- private final boolean verifyChecksum;
-
- private boolean sentStatusCode = false;
-
- @VisibleForTesting
- public Peer getPeer() {
- return peer;
- }
-
- @Override
- public synchronized int read(byte[] buf, int off, int len)
- throws IOException {
-
- UUID randomId = null;
- if (LOG.isTraceEnabled()) {
- randomId = UUID.randomUUID();
- LOG.trace(String.format("Starting read #%s file %s from datanode %s",
- randomId.toString(), this.filename,
- this.datanodeID.getHostName()));
- }
-
- if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
- TraceScope scope = Trace.startSpan(
- "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
- try {
- readNextPacket();
- } finally {
- scope.close();
- }
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format("Finishing read #" + randomId));
- }
-
- if (curDataSlice.remaining() == 0) {
- // we're at EOF now
- return -1;
- }
-
- int nRead = Math.min(curDataSlice.remaining(), len);
- curDataSlice.get(buf, off, nRead);
-
- return nRead;
- }
-
-
- @Override
- public synchronized int read(ByteBuffer buf) throws IOException {
- if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
- TraceScope scope = Trace.startSpan(
- "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
- try {
- readNextPacket();
- } finally {
- scope.close();
- }
- }
- if (curDataSlice.remaining() == 0) {
- // we're at EOF now
- return -1;
- }
-
- int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
- ByteBuffer writeSlice = curDataSlice.duplicate();
- writeSlice.limit(writeSlice.position() + nRead);
- buf.put(writeSlice);
- curDataSlice.position(writeSlice.position());
-
- return nRead;
- }
-
- private void readNextPacket() throws IOException {
- //Read packet headers.
- packetReceiver.receiveNextPacket(in);
-
- PacketHeader curHeader = packetReceiver.getHeader();
- curDataSlice = packetReceiver.getDataSlice();
- assert curDataSlice.capacity() == curHeader.getDataLen();
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("DFSClient readNextPacket got header " + curHeader);
- }
-
- // Sanity check the lengths
- if (!curHeader.sanityCheck(lastSeqNo)) {
- throw new IOException("BlockReader: error in packet header " +
- curHeader);
- }
-
- if (curHeader.getDataLen() > 0) {
- int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
- int checksumsLen = chunks * checksumSize;
-
- assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
- "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() +
- " checksumsLen=" + checksumsLen;
-
- lastSeqNo = curHeader.getSeqno();
- if (verifyChecksum && curDataSlice.remaining() > 0) {
- // N.B.: the checksum error offset reported here is actually
- // relative to the start of the block, not the start of the file.
- // This is slightly misleading, but preserves the behavior from
- // the older BlockReader.
- checksum.verifyChunkedSums(curDataSlice,
- packetReceiver.getChecksumSlice(),
- filename, curHeader.getOffsetInBlock());
- }
- bytesNeededToFinish -= curHeader.getDataLen();
- }
-
- // First packet will include some data prior to the first byte
- // the user requested. Skip it.
- if (curHeader.getOffsetInBlock() < startOffset) {
- int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
- curDataSlice.position(newPos);
- }
-
- // If we've now satisfied the whole client read, read one last packet
- // header, which should be empty
- if (bytesNeededToFinish <= 0) {
- readTrailingEmptyPacket();
- if (verifyChecksum) {
- sendReadResult(Status.CHECKSUM_OK);
- } else {
- sendReadResult(Status.SUCCESS);
- }
- }
- }
-
- @Override
- public synchronized long skip(long n) throws IOException {
- /* How can we make sure we don't throw a ChecksumException, at least
- * in majority of the cases?. This one throws. */
- long skipped = 0;
- while (skipped < n) {
- long needToSkip = n - skipped;
- if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
- readNextPacket();
- }
- if (curDataSlice.remaining() == 0) {
- // we're at EOF now
- break;
- }
-
- int skip = (int)Math.min(curDataSlice.remaining(), needToSkip);
- curDataSlice.position(curDataSlice.position() + skip);
- skipped += skip;
- }
- return skipped;
- }
-
- private void readTrailingEmptyPacket() throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Reading empty packet at end of read");
- }
-
- packetReceiver.receiveNextPacket(in);
-
- PacketHeader trailer = packetReceiver.getHeader();
- if (!trailer.isLastPacketInBlock() ||
- trailer.getDataLen() != 0) {
- throw new IOException("Expected empty end-of-read packet! Header: " +
- trailer);
- }
- }
-
- protected RemoteBlockReader2(String file, String bpid, long blockId,
- DataChecksum checksum, boolean verifyChecksum,
- long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
- DatanodeID datanodeID, PeerCache peerCache) {
- this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
- createSocketAddr(datanodeID.getXferAddr()));
- // Path is used only for printing block and file information in debug
- this.peer = peer;
- this.datanodeID = datanodeID;
- this.in = peer.getInputStreamChannel();
- this.checksum = checksum;
- this.verifyChecksum = verifyChecksum;
- this.startOffset = Math.max( startOffset, 0 );
- this.filename = file;
- this.peerCache = peerCache;
- this.blockId = blockId;
-
- // The total number of bytes that we need to transfer from the DN is
- // the amount that the user wants (bytesToRead), plus the padding at
- // the beginning in order to chunk-align. Note that the DN may elect
- // to send more than this amount if the read starts/ends mid-chunk.
- this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
- bytesPerChecksum = this.checksum.getBytesPerChecksum();
- checksumSize = this.checksum.getChecksumSize();
- }
-
-
- @Override
- public synchronized void close() throws IOException {
- packetReceiver.close();
- startOffset = -1;
- checksum = null;
- if (peerCache != null && sentStatusCode) {
- peerCache.put(datanodeID, peer);
- } else {
- peer.close();
- }
-
- // in will be closed when its Socket is closed.
- }
-
- /**
- * When the reader reaches end of the read, it sends a status response
- * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
- * closing our connection (which we will re-open), but won't affect
- * data correctness.
- */
- void sendReadResult(Status statusCode) {
- assert !sentStatusCode : "already sent status code to " + peer;
- try {
- writeReadResult(peer.getOutputStream(), statusCode);
- sentStatusCode = true;
- } catch (IOException e) {
- // It's ok not to be able to send this. But something is probably wrong.
- LOG.info("Could not send read status (" + statusCode + ") to datanode " +
- peer.getRemoteAddressString() + ": " + e.getMessage());
- }
- }
-
- /**
- * Serialize the actual read result on the wire.
- */
- static void writeReadResult(OutputStream out, Status statusCode)
- throws IOException {
-
- ClientReadStatusProto.newBuilder()
- .setStatus(statusCode)
- .build()
- .writeDelimitedTo(out);
-
- out.flush();
- }
-
- /**
- * File name to print when accessing a block directly (from servlets)
- * @param s Address of the block location
- * @param poolId Block pool ID of the block
- * @param blockId Block ID of the block
- * @return string that has a file name for debug purposes
- */
- public static String getFileName(final InetSocketAddress s,
- final String poolId, final long blockId) {
- return s.toString() + ":" + poolId + ":" + blockId;
- }
-
- @Override
- public int readAll(byte[] buf, int offset, int len) throws IOException {
- return BlockReaderUtil.readAll(this, buf, offset, len);
- }
-
- @Override
- public void readFully(byte[] buf, int off, int len) throws IOException {
- BlockReaderUtil.readFully(this, buf, off, len);
- }
-
- /**
- * Create a new BlockReader specifically to satisfy a read.
- * This method also sends the OP_READ_BLOCK request.
- *
- * @param file File location
- * @param block The block object
- * @param blockToken The block token for security
- * @param startOffset The read offset, relative to block head
- * @param len The number of bytes to read
- * @param verifyChecksum Whether to verify checksum
- * @param clientName Client name
- * @param peer The Peer to use
- * @param datanodeID The DatanodeID this peer is connected to
- * @return New BlockReader instance, or null on error.
- */
- public static BlockReader newBlockReader(String file,
- ExtendedBlock block,
- Token<BlockTokenIdentifier> blockToken,
- long startOffset, long len,
- boolean verifyChecksum,
- String clientName,
- Peer peer, DatanodeID datanodeID,
- PeerCache peerCache,
- CachingStrategy cachingStrategy) throws IOException {
- // in and out will be closed when sock is closed (by the caller)
- final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
- peer.getOutputStream()));
- new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
- verifyChecksum, cachingStrategy);
-
- //
- // Get bytes in block
- //
- DataInputStream in = new DataInputStream(peer.getInputStream());
-
- BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
- PBHelperClient.vintPrefixed(in));
- checkSuccess(status, peer, block, file);
- ReadOpChecksumInfoProto checksumInfo =
- status.getReadOpChecksumInfo();
- DataChecksum checksum = DataTransferProtoUtil.fromProto(
- checksumInfo.getChecksum());
- //Warning when we get CHECKSUM_NULL?
-
- // Read the first chunk offset.
- long firstChunkOffset = checksumInfo.getChunkOffset();
-
- if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
- firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
- throw new IOException("BlockReader: error in first chunk offset (" +
- firstChunkOffset + ") startOffset is " +
- startOffset + " for file " + file);
- }
-
- return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
- checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer,
- datanodeID, peerCache);
- }
-
- static void checkSuccess(
- BlockOpResponseProto status, Peer peer,
- ExtendedBlock block, String file)
- throws IOException {
- String logInfo = "for OP_READ_BLOCK"
- + ", self=" + peer.getLocalAddressString()
- + ", remote=" + peer.getRemoteAddressString()
- + ", for file " + file
- + ", for pool " + block.getBlockPoolId()
- + " block " + block.getBlockId() + "_" + block.getGenerationStamp();
- DataTransferProtoUtil.checkBlockOpStatus(status, logInfo);
- }
-
- @Override
- public int available() throws IOException {
- // An optimistic estimate of how much data is available
- // to us without doing network I/O.
- return DFSClient.TCP_WINDOW_SIZE;
- }
-
- @Override
- public boolean isLocal() {
- return isLocal;
- }
-
- @Override
- public boolean isShortCircuit() {
- return false;
- }
-
- @Override
- public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
deleted file mode 100644
index c9966a7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
-import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
-
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Shorts;
-import com.google.common.primitives.Ints;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-/**
- * Header data for each packet that goes through the read/write pipelines.
- * Includes all of the information about the packet, excluding checksums and
- * actual data.
- *
- * This data includes:
- * - the offset in bytes into the HDFS block of the data in this packet
- * - the sequence number of this packet in the pipeline
- * - whether or not this is the last packet in the pipeline
- * - the length of the data in this packet
- * - whether or not this packet should be synced by the DNs.
- *
- * When serialized, this header is written out as a protocol buffer, preceded
- * by a 4-byte integer representing the full packet length, and a 2-byte short
- * representing the header length.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class PacketHeader {
- private static final int MAX_PROTO_SIZE =
- PacketHeaderProto.newBuilder()
- .setOffsetInBlock(0)
- .setSeqno(0)
- .setLastPacketInBlock(false)
- .setDataLen(0)
- .setSyncBlock(false)
- .build().getSerializedSize();
- public static final int PKT_LENGTHS_LEN =
- Ints.BYTES + Shorts.BYTES;
- public static final int PKT_MAX_HEADER_LEN =
- PKT_LENGTHS_LEN + MAX_PROTO_SIZE;
-
- private int packetLen;
- private PacketHeaderProto proto;
-
- public PacketHeader() {
- }
-
- public PacketHeader(int packetLen, long offsetInBlock, long seqno,
- boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
- this.packetLen = packetLen;
- Preconditions.checkArgument(packetLen >= Ints.BYTES,
- "packet len %s should always be at least 4 bytes",
- packetLen);
-
- PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder()
- .setOffsetInBlock(offsetInBlock)
- .setSeqno(seqno)
- .setLastPacketInBlock(lastPacketInBlock)
- .setDataLen(dataLen);
-
- if (syncBlock) {
- // Only set syncBlock if it is specified.
- // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721
- // because it changes the length of the packet header, and BlockReceiver
- // in that version did not support variable-length headers.
- builder.setSyncBlock(syncBlock);
- }
-
- proto = builder.build();
- }
-
- public int getDataLen() {
- return proto.getDataLen();
- }
-
- public boolean isLastPacketInBlock() {
- return proto.getLastPacketInBlock();
- }
-
- public long getSeqno() {
- return proto.getSeqno();
- }
-
- public long getOffsetInBlock() {
- return proto.getOffsetInBlock();
- }
-
- public int getPacketLen() {
- return packetLen;
- }
-
- public boolean getSyncBlock() {
- return proto.getSyncBlock();
- }
-
- @Override
- public String toString() {
- return "PacketHeader with packetLen=" + packetLen +
- " header data: " +
- proto.toString();
- }
-
- public void setFieldsFromData(
- int packetLen, byte[] headerData) throws InvalidProtocolBufferException {
- this.packetLen = packetLen;
- proto = PacketHeaderProto.parseFrom(headerData);
- }
-
- public void readFields(ByteBuffer buf) throws IOException {
- packetLen = buf.getInt();
- short protoLen = buf.getShort();
- byte[] data = new byte[protoLen];
- buf.get(data);
- proto = PacketHeaderProto.parseFrom(data);
- }
-
- public void readFields(DataInputStream in) throws IOException {
- this.packetLen = in.readInt();
- short protoLen = in.readShort();
- byte[] data = new byte[protoLen];
- in.readFully(data);
- proto = PacketHeaderProto.parseFrom(data);
- }
-
- /**
- * @return the number of bytes necessary to write out this header,
- * including the length-prefixing of the payload and header
- */
- public int getSerializedSize() {
- return PKT_LENGTHS_LEN + proto.getSerializedSize();
- }
-
- /**
- * Write the header into the buffer.
- * This requires that PKT_HEADER_LEN bytes are available.
- */
- public void putInBuffer(final ByteBuffer buf) {
- assert proto.getSerializedSize() <= MAX_PROTO_SIZE
- : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
- try {
- buf.putInt(packetLen);
- buf.putShort((short) proto.getSerializedSize());
- proto.writeTo(new ByteBufferOutputStream(buf));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public void write(DataOutputStream out) throws IOException {
- assert proto.getSerializedSize() <= MAX_PROTO_SIZE
- : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
- out.writeInt(packetLen);
- out.writeShort(proto.getSerializedSize());
- proto.writeTo(out);
- }
-
- public byte[] getBytes() {
- ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
- putInBuffer(buf);
- return buf.array();
- }
-
- /**
- * Perform a sanity check on the packet, returning true if it is sane.
- * @param lastSeqNo the previous sequence number received - we expect the current
- * sequence number to be larger by 1.
- */
- public boolean sanityCheck(long lastSeqNo) {
- // We should only have a non-positive data length for the last packet
- if (proto.getDataLen() <= 0 && !proto.getLastPacketInBlock()) return false;
- // The last packet should not contain data
- if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
- // Seqnos should always increase by 1 with each packet received
- if (proto.getSeqno() != lastSeqNo + 1) return false;
- return true;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof PacketHeader)) return false;
- PacketHeader other = (PacketHeader)o;
- return this.proto.equals(other.proto);
- }
-
- @Override
- public int hashCode() {
- return (int)proto.getSeqno();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
deleted file mode 100644
index 3045a13..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.io.Closeable;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.util.DirectBufferPool;
-import org.apache.hadoop.io.IOUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
-
-/**
- * Class to handle reading packets one-at-a-time from the wire.
- * These packets are used both for reading and writing data to/from
- * DataNodes.
- */
-@InterfaceAudience.Private
-public class PacketReceiver implements Closeable {
-
- /**
- * The max size of any single packet. This prevents OOMEs when
- * invalid data is sent.
- */
- private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
-
- static final Log LOG = LogFactory.getLog(PacketReceiver.class);
-
- private static final DirectBufferPool bufferPool = new DirectBufferPool();
- private final boolean useDirectBuffers;
-
- /**
- * The entirety of the most recently read packet.
- * The first PKT_LENGTHS_LEN bytes of this buffer are the
- * length prefixes.
- */
- private ByteBuffer curPacketBuf = null;
-
- /**
- * A slice of {@link #curPacketBuf} which contains just the checksums.
- */
- private ByteBuffer curChecksumSlice = null;
-
- /**
- * A slice of {@link #curPacketBuf} which contains just the data.
- */
- private ByteBuffer curDataSlice = null;
-
- /**
- * The packet header of the most recently read packet.
- */
- private PacketHeader curHeader;
-
- public PacketReceiver(boolean useDirectBuffers) {
- this.useDirectBuffers = useDirectBuffers;
- reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN);
- }
-
- public PacketHeader getHeader() {
- return curHeader;
- }
-
- public ByteBuffer getDataSlice() {
- return curDataSlice;
- }
-
- public ByteBuffer getChecksumSlice() {
- return curChecksumSlice;
- }
-
- /**
- * Reads all of the data for the next packet into the appropriate buffers.
- *
- * The data slice and checksum slice members will be set to point to the
- * user data and corresponding checksums. The header will be parsed and
- * set.
- */
- public void receiveNextPacket(ReadableByteChannel in) throws IOException {
- doRead(in, null);
- }
-
- /**
- * @see #receiveNextPacket(ReadableByteChannel)
- */
- public void receiveNextPacket(InputStream in) throws IOException {
- doRead(null, in);
- }
-
- private void doRead(ReadableByteChannel ch, InputStream in)
- throws IOException {
- // Each packet looks like:
- // PLEN HLEN HEADER CHECKSUMS DATA
- // 32-bit 16-bit <protobuf> <variable length>
- //
- // PLEN: Payload length
- // = length(PLEN) + length(CHECKSUMS) + length(DATA)
- // This length includes its own encoded length in
- // the sum for historical reasons.
- //
- // HLEN: Header length
- // = length(HEADER)
- //
- // HEADER: the actual packet header fields, encoded in protobuf
- // CHECKSUMS: the crcs for the data chunk. May be missing if
- // checksums were not requested
- // DATA the actual block data
- Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
-
- curPacketBuf.clear();
- curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN);
- doReadFully(ch, in, curPacketBuf);
- curPacketBuf.flip();
- int payloadLen = curPacketBuf.getInt();
-
- if (payloadLen < Ints.BYTES) {
- // The "payload length" includes its own length. Therefore it
- // should never be less than 4 bytes
- throw new IOException("Invalid payload length " +
- payloadLen);
- }
- int dataPlusChecksumLen = payloadLen - Ints.BYTES;
- int headerLen = curPacketBuf.getShort();
- if (headerLen < 0) {
- throw new IOException("Invalid header length " + headerLen);
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("readNextPacket: dataPlusChecksumLen = " + dataPlusChecksumLen +
- " headerLen = " + headerLen);
- }
-
- // Sanity check the buffer size so we don't allocate too much memory
- // and OOME.
- int totalLen = payloadLen + headerLen;
- if (totalLen < 0 || totalLen > MAX_PACKET_SIZE) {
- throw new IOException("Incorrect value for packet payload size: " +
- payloadLen);
- }
-
- // Make sure we have space for the whole packet, and
- // read it.
- reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN +
- dataPlusChecksumLen + headerLen);
- curPacketBuf.clear();
- curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
- curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN +
- dataPlusChecksumLen + headerLen);
- doReadFully(ch, in, curPacketBuf);
- curPacketBuf.flip();
- curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
-
- // Extract the header from the front of the buffer (after the length prefixes)
- byte[] headerBuf = new byte[headerLen];
- curPacketBuf.get(headerBuf);
- if (curHeader == null) {
- curHeader = new PacketHeader();
- }
- curHeader.setFieldsFromData(payloadLen, headerBuf);
-
- // Compute the sub-slices of the packet
- int checksumLen = dataPlusChecksumLen - curHeader.getDataLen();
- if (checksumLen < 0) {
- throw new IOException("Invalid packet: data length in packet header " +
- "exceeds data length received. dataPlusChecksumLen=" +
- dataPlusChecksumLen + " header: " + curHeader);
- }
-
- reslicePacket(headerLen, checksumLen, curHeader.getDataLen());
- }
-
- /**
- * Rewrite the last-read packet on the wire to the given output stream.
- */
- public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
- Preconditions.checkState(!useDirectBuffers,
- "Currently only supported for non-direct buffers");
- mirrorOut.write(curPacketBuf.array(),
- curPacketBuf.arrayOffset(),
- curPacketBuf.remaining());
- }
-
-
- private static void doReadFully(ReadableByteChannel ch, InputStream in,
- ByteBuffer buf) throws IOException {
- if (ch != null) {
- readChannelFully(ch, buf);
- } else {
- Preconditions.checkState(!buf.isDirect(),
- "Must not use direct buffers with InputStream API");
- IOUtils.readFully(in, buf.array(),
- buf.arrayOffset() + buf.position(),
- buf.remaining());
- buf.position(buf.position() + buf.remaining());
- }
- }
-
- private void reslicePacket(
- int headerLen, int checksumsLen, int dataLen) {
- // Packet structure (refer to doRead() for details):
- // PLEN HLEN HEADER CHECKSUMS DATA
- // 32-bit 16-bit <protobuf> <variable length>
- // |--- lenThroughHeader ----|
- // |----------- lenThroughChecksums ----|
- // |------------------- lenThroughData ------|
- int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen;
- int lenThroughChecksums = lenThroughHeader + checksumsLen;
- int lenThroughData = lenThroughChecksums + dataLen;
-
- assert dataLen >= 0 : "invalid datalen: " + dataLen;
- assert curPacketBuf.position() == lenThroughHeader;
- assert curPacketBuf.limit() == lenThroughData :
- "headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
- " rem=" + curPacketBuf.remaining();
-
- // Slice the checksums.
- curPacketBuf.position(lenThroughHeader);
- curPacketBuf.limit(lenThroughChecksums);
- curChecksumSlice = curPacketBuf.slice();
-
- // Slice the data.
- curPacketBuf.position(lenThroughChecksums);
- curPacketBuf.limit(lenThroughData);
- curDataSlice = curPacketBuf.slice();
-
- // Reset buffer to point to the entirety of the packet (including
- // length prefixes)
- curPacketBuf.position(0);
- curPacketBuf.limit(lenThroughData);
- }
-
-
- private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
- throws IOException {
- while (buf.remaining() > 0) {
- int n = ch.read(buf);
- if (n < 0) {
- throw new IOException("Premature EOF reading from " + ch);
- }
- }
- }
-
- private void reallocPacketBuf(int atLeastCapacity) {
- // Realloc the buffer if this packet is longer than the previous
- // one.
- if (curPacketBuf == null ||
- curPacketBuf.capacity() < atLeastCapacity) {
- ByteBuffer newBuf;
- if (useDirectBuffers) {
- newBuf = bufferPool.getBuffer(atLeastCapacity);
- } else {
- newBuf = ByteBuffer.allocate(atLeastCapacity);
- }
- // If reallocing an existing buffer, copy the old packet length
- // prefixes over
- if (curPacketBuf != null) {
- curPacketBuf.flip();
- newBuf.put(curPacketBuf);
- }
-
- returnPacketBufToPool();
- curPacketBuf = newBuf;
- }
- }
-
- private void returnPacketBufToPool() {
- if (curPacketBuf != null && curPacketBuf.isDirect()) {
- bufferPool.returnBuffer(curPacketBuf);
- curPacketBuf = null;
- }
- }
-
- @Override // Closeable
- public void close() {
- returnPacketBufToPool();
- }
-
- @Override
- protected void finalize() throws Throwable {
- try {
- // just in case it didn't get closed, we
- // may as well still try to return the buffer
- returnPacketBufToPool();
- } finally {
- super.finalize();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
deleted file mode 100644
index 31d4dcc..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.util;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * OutputStream that writes into a {@link ByteBuffer}.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Stable
-public class ByteBufferOutputStream extends OutputStream {
-
- private final ByteBuffer buf;
-
- public ByteBufferOutputStream(ByteBuffer buf) {
- this.buf = buf;
- }
-
- @Override
- public void write(int b) throws IOException {
- buf.put((byte)b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- buf.put(b, off, len);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
index 8dd3d6f..5ff343a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
@@ -24,10 +24,10 @@ import static org.mockito.Mockito.verify;
import java.util.List;
-import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -41,7 +41,7 @@ public class TestClientBlockVerification {
static LocatedBlock testBlock = null;
static {
- ((Log4JLogger)RemoteBlockReader2.LOG).getLogger().setLevel(Level.ALL);
+ GenericTestUtils.setLogLevel(RemoteBlockReader2.LOG, Level.ALL);
}
@BeforeClass
public static void setupCluster() throws Exception {