You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:58:47 UTC
[35/50] [abbrv] hadoop git commit: HDFS-8990. Move RemoteBlockReader
to hdfs-client module. Contributed by Mingliang Liu.
HDFS-8990. Move RemoteBlockReader to hdfs-client module. Contributed by Mingliang Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/826ae1c2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/826ae1c2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/826ae1c2
Branch: refs/heads/HDFS-7285
Commit: 826ae1c26d31f87d88efc920b271bec7eec2e17a
Parents: caa04de
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Aug 31 13:54:14 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Aug 31 13:54:14 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/hdfs/RemoteBlockReader.java | 512 +++++++++++++++++++
.../apache/hadoop/hdfs/RemoteBlockReader2.java | 480 +++++++++++++++++
.../protocol/datatransfer/PacketHeader.java | 214 ++++++++
.../protocol/datatransfer/PacketReceiver.java | 310 +++++++++++
.../hdfs/util/ByteBufferOutputStream.java | 49 ++
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../java/org/apache/hadoop/hdfs/DFSClient.java | 1 -
.../apache/hadoop/hdfs/RemoteBlockReader.java | 508 ------------------
.../apache/hadoop/hdfs/RemoteBlockReader2.java | 477 -----------------
.../protocol/datatransfer/PacketHeader.java | 214 --------
.../protocol/datatransfer/PacketReceiver.java | 310 -----------
.../hdfs/util/ByteBufferOutputStream.java | 49 --
.../hdfs/TestClientBlockVerification.java | 4 +-
13 files changed, 1570 insertions(+), 1561 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
new file mode 100644
index 0000000..7509da5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @deprecated this is an old implementation that is being left around
+ * in case any issues spring up with the new {@link RemoteBlockReader2} implementation.
+ * It will be removed in the next release.
+ */
+@InterfaceAudience.Private
+@Deprecated
+public class RemoteBlockReader extends FSInputChecker implements BlockReader {
+ static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class);
+
+ private final Peer peer;
+ private final DatanodeID datanodeID;
+ private final DataInputStream in;
+ private DataChecksum checksum;
+
+ /** offset in block of the last chunk received */
+ private long lastChunkOffset = -1;
+ private long lastChunkLen = -1;
+ private long lastSeqNo = -1;
+
+ /** offset in block where reader wants to actually read */
+ private long startOffset;
+
+ private final long blockId;
+
+ /** offset in block of of first chunk - may be less than startOffset
+ if startOffset is not chunk-aligned */
+ private final long firstChunkOffset;
+
+ private final int bytesPerChecksum;
+ private final int checksumSize;
+
+ /**
+ * The total number of bytes we need to transfer from the DN.
+ * This is the amount that the user has requested plus some padding
+ * at the beginning so that the read can begin on a chunk boundary.
+ */
+ private final long bytesNeededToFinish;
+
+ /**
+ * True if we are reading from a local DataNode.
+ */
+ private final boolean isLocal;
+
+ private boolean eos = false;
+ private boolean sentStatusCode = false;
+
+ ByteBuffer checksumBytes = null;
+ /** Amount of unread data in the current received packet */
+ int dataLeft = 0;
+
+ private final PeerCache peerCache;
+
+ /* FSInputChecker interface */
+
+ /* same interface as inputStream java.io.InputStream#read()
+ * used by DFSInputStream#read()
+ * This violates one rule when there is a checksum error:
+ * "Read should not modify user buffer before successful read"
+ * because it first reads the data to user buffer and then checks
+ * the checksum.
+ */
+ @Override
+ public synchronized int read(byte[] buf, int off, int len)
+ throws IOException {
+
+ // This has to be set here, *before* the skip, since we can
+ // hit EOS during the skip, in the case that our entire read
+ // is smaller than the checksum chunk.
+ boolean eosBefore = eos;
+
+ //for the first read, skip the extra bytes at the front.
+ if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
+ // Skip these bytes. But don't call this.skip()!
+ int toSkip = (int)(startOffset - firstChunkOffset);
+ if ( super.readAndDiscard(toSkip) != toSkip ) {
+ // should never happen
+ throw new IOException("Could not skip required number of bytes");
+ }
+ }
+
+ int nRead = super.read(buf, off, len);
+
+ // if eos was set in the previous read, send a status code to the DN
+ if (eos && !eosBefore && nRead >= 0) {
+ if (needChecksum()) {
+ sendReadResult(peer, Status.CHECKSUM_OK);
+ } else {
+ sendReadResult(peer, Status.SUCCESS);
+ }
+ }
+ return nRead;
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ /* How can we make sure we don't throw a ChecksumException, at least
+ * in majority of the cases?. This one throws. */
+ long nSkipped = 0;
+ while (nSkipped < n) {
+ int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE);
+ int ret = readAndDiscard(toSkip);
+ if (ret <= 0) {
+ return nSkipped;
+ }
+ nSkipped += ret;
+ }
+ return nSkipped;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new IOException("read() is not expected to be invoked. " +
+ "Use read(buf, off, len) instead.");
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ /* Checksum errors are handled outside the BlockReader.
+ * DFSInputStream does not always call 'seekToNewSource'. In the
+ * case of pread(), it just tries a different replica without seeking.
+ */
+ return false;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ throw new IOException("Seek() is not supported in BlockInputChecker");
+ }
+
+ @Override
+ protected long getChunkPosition(long pos) {
+ throw new RuntimeException("getChunkPosition() is not supported, " +
+ "since seek is not required");
+ }
+
+ /**
+ * Makes sure that checksumBytes has enough capacity
+ * and limit is set to the number of checksum bytes needed
+ * to be read.
+ */
+ private void adjustChecksumBytes(int dataLen) {
+ int requiredSize =
+ ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
+ if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
+ checksumBytes = ByteBuffer.wrap(new byte[requiredSize]);
+ } else {
+ checksumBytes.clear();
+ }
+ checksumBytes.limit(requiredSize);
+ }
+
+ @Override
+ protected synchronized int readChunk(long pos, byte[] buf, int offset,
+ int len, byte[] checksumBuf)
+ throws IOException {
+ TraceScope scope =
+ Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")",
+ Sampler.NEVER);
+ try {
+ return readChunkImpl(pos, buf, offset, len, checksumBuf);
+ } finally {
+ scope.close();
+ }
+ }
+
+ private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
+ int len, byte[] checksumBuf)
+ throws IOException {
+ // Read one chunk.
+ if (eos) {
+ // Already hit EOF
+ return -1;
+ }
+
+ // Read one DATA_CHUNK.
+ long chunkOffset = lastChunkOffset;
+ if ( lastChunkLen > 0 ) {
+ chunkOffset += lastChunkLen;
+ }
+
+ // pos is relative to the start of the first chunk of the read.
+ // chunkOffset is relative to the start of the block.
+ // This makes sure that the read passed from FSInputChecker is the
+ // for the same chunk we expect to be reading from the DN.
+ if ( (pos + firstChunkOffset) != chunkOffset ) {
+ throw new IOException("Mismatch in pos : " + pos + " + " +
+ firstChunkOffset + " != " + chunkOffset);
+ }
+
+ // Read next packet if the previous packet has been read completely.
+ if (dataLeft <= 0) {
+ //Read packet headers.
+ PacketHeader header = new PacketHeader();
+ header.readFields(in);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DFSClient readChunk got header " + header);
+ }
+
+ // Sanity check the lengths
+ if (!header.sanityCheck(lastSeqNo)) {
+ throw new IOException("BlockReader: error in packet header " +
+ header);
+ }
+
+ lastSeqNo = header.getSeqno();
+ dataLeft = header.getDataLen();
+ adjustChecksumBytes(header.getDataLen());
+ if (header.getDataLen() > 0) {
+ IOUtils.readFully(in, checksumBytes.array(), 0,
+ checksumBytes.limit());
+ }
+ }
+
+ // Sanity checks
+ assert len >= bytesPerChecksum;
+ assert checksum != null;
+ assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
+
+
+ int checksumsToRead, bytesToRead;
+
+ if (checksumSize > 0) {
+
+ // How many chunks left in our packet - this is a ceiling
+ // since we may have a partial chunk at the end of the file
+ int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
+
+ // How many chunks we can fit in databuffer
+ // - note this is a floor since we always read full chunks
+ int chunksCanFit = Math.min(len / bytesPerChecksum,
+ checksumBuf.length / checksumSize);
+
+ // How many chunks should we read
+ checksumsToRead = Math.min(chunksLeft, chunksCanFit);
+ // How many bytes should we actually read
+ bytesToRead = Math.min(
+ checksumsToRead * bytesPerChecksum, // full chunks
+ dataLeft); // in case we have a partial
+ } else {
+ // no checksum
+ bytesToRead = Math.min(dataLeft, len);
+ checksumsToRead = 0;
+ }
+
+ if ( bytesToRead > 0 ) {
+ // Assert we have enough space
+ assert bytesToRead <= len;
+ assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
+ assert checksumBuf.length >= checksumSize * checksumsToRead;
+ IOUtils.readFully(in, buf, offset, bytesToRead);
+ checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
+ }
+
+ dataLeft -= bytesToRead;
+ assert dataLeft >= 0;
+
+ lastChunkOffset = chunkOffset;
+ lastChunkLen = bytesToRead;
+
+ // If there's no data left in the current packet after satisfying
+ // this read, and we have satisfied the client read, we expect
+ // an empty packet header from the DN to signify this.
+ // Note that pos + bytesToRead may in fact be greater since the
+ // DN finishes off the entire last chunk.
+ if (dataLeft == 0 &&
+ pos + bytesToRead >= bytesNeededToFinish) {
+
+ // Read header
+ PacketHeader hdr = new PacketHeader();
+ hdr.readFields(in);
+
+ if (!hdr.isLastPacketInBlock() ||
+ hdr.getDataLen() != 0) {
+ throw new IOException("Expected empty end-of-read packet! Header: " +
+ hdr);
+ }
+
+ eos = true;
+ }
+
+ if ( bytesToRead == 0 ) {
+ return -1;
+ }
+
+ return bytesToRead;
+ }
+
+ private RemoteBlockReader(String file, String bpid, long blockId,
+ DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
+ long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
+ DatanodeID datanodeID, PeerCache peerCache) {
+ // Path is used only for printing block and file information in debug
+ super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId +
+ ":" + bpid + ":of:"+ file)/*too non path-like?*/,
+ 1, verifyChecksum,
+ checksum.getChecksumSize() > 0? checksum : null,
+ checksum.getBytesPerChecksum(),
+ checksum.getChecksumSize());
+
+ this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
+ createSocketAddr(datanodeID.getXferAddr()));
+
+ this.peer = peer;
+ this.datanodeID = datanodeID;
+ this.in = in;
+ this.checksum = checksum;
+ this.startOffset = Math.max( startOffset, 0 );
+ this.blockId = blockId;
+
+ // The total number of bytes that we need to transfer from the DN is
+ // the amount that the user wants (bytesToRead), plus the padding at
+ // the beginning in order to chunk-align. Note that the DN may elect
+ // to send more than this amount if the read starts/ends mid-chunk.
+ this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+
+ this.firstChunkOffset = firstChunkOffset;
+ lastChunkOffset = firstChunkOffset;
+ lastChunkLen = -1;
+
+ bytesPerChecksum = this.checksum.getBytesPerChecksum();
+ checksumSize = this.checksum.getChecksumSize();
+ this.peerCache = peerCache;
+ }
+
+ /**
+ * Create a new BlockReader specifically to satisfy a read.
+ * This method also sends the OP_READ_BLOCK request.
+ *
+ * @param file File location
+ * @param block The block object
+ * @param blockToken The block token for security
+ * @param startOffset The read offset, relative to block head
+ * @param len The number of bytes to read
+ * @param bufferSize The IO buffer size (not the client buffer size)
+ * @param verifyChecksum Whether to verify checksum
+ * @param clientName Client name
+ * @return New BlockReader instance, or null on error.
+ */
+ public static RemoteBlockReader newBlockReader(String file,
+ ExtendedBlock block,
+ Token<BlockTokenIdentifier> blockToken,
+ long startOffset, long len,
+ int bufferSize, boolean verifyChecksum,
+ String clientName, Peer peer,
+ DatanodeID datanodeID,
+ PeerCache peerCache,
+ CachingStrategy cachingStrategy)
+ throws IOException {
+ // in and out will be closed when sock is closed (by the caller)
+ final DataOutputStream out =
+ new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
+ new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
+ verifyChecksum, cachingStrategy);
+
+ //
+ // Get bytes in block, set streams
+ //
+
+ DataInputStream in = new DataInputStream(
+ new BufferedInputStream(peer.getInputStream(), bufferSize));
+
+ BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+ PBHelperClient.vintPrefixed(in));
+ RemoteBlockReader2.checkSuccess(status, peer, block, file);
+ ReadOpChecksumInfoProto checksumInfo =
+ status.getReadOpChecksumInfo();
+ DataChecksum checksum = DataTransferProtoUtil.fromProto(
+ checksumInfo.getChecksum());
+ //Warning when we get CHECKSUM_NULL?
+
+ // Read the first chunk offset.
+ long firstChunkOffset = checksumInfo.getChunkOffset();
+
+ if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+ firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
+ throw new IOException("BlockReader: error in first chunk offset (" +
+ firstChunkOffset + ") startOffset is " +
+ startOffset + " for file " + file);
+ }
+
+ return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
+ in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
+ peer, datanodeID, peerCache);
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ startOffset = -1;
+ checksum = null;
+ if (peerCache != null & sentStatusCode) {
+ peerCache.put(datanodeID, peer);
+ } else {
+ peer.close();
+ }
+
+ // in will be closed when its Socket is closed.
+ }
+
+ @Override
+ public void readFully(byte[] buf, int readOffset, int amtToRead)
+ throws IOException {
+ IOUtils.readFully(this, buf, readOffset, amtToRead);
+ }
+
+ @Override
+ public int readAll(byte[] buf, int offset, int len) throws IOException {
+ return readFully(this, buf, offset, len);
+ }
+
+ /**
+ * When the reader reaches end of the read, it sends a status response
+ * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+ * closing our connection (which we will re-open), but won't affect
+ * data correctness.
+ */
+ void sendReadResult(Peer peer, Status statusCode) {
+ assert !sentStatusCode : "already sent status code to " + peer;
+ try {
+ RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
+ sentStatusCode = true;
+ } catch (IOException e) {
+ // It's ok not to be able to send this. But something is probably wrong.
+ LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+ peer.getRemoteAddressString() + ": " + e.getMessage());
+ }
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
+ }
+
+ @Override
+ public int available() throws IOException {
+ // An optimistic estimate of how much data is available
+ // to us without doing network I/O.
+ return RemoteBlockReader2.TCP_WINDOW_SIZE;
+ }
+
+ @Override
+ public boolean isLocal() {
+ return isLocal;
+ }
+
+ @Override
+ public boolean isShortCircuit() {
+ return false;
+ }
+
+ @Override
+ public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
new file mode 100644
index 0000000..5541e6d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -0,0 +1,480 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.EnumSet;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a wrapper around connection to datanode
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ * <dd>The hdfs block, typically large (~64MB).
+ * </dd>
+ * <dt>chunk</dt>
+ * <dd>A block is divided into chunks, each comes with a checksum.
+ * We want transfers to be chunk-aligned, to be able to
+ * verify checksums.
+ * </dd>
+ * <dt>packet</dt>
+ * <dd>A grouping of chunks used for transport. It contains a
+ * header, followed by checksum data, followed by real data.
+ * </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
+ *
+ * This is a new implementation introduced in Hadoop 0.23 which
+ * is more efficient and simpler than the older BlockReader
+ * implementation. It should be renamed to RemoteBlockReader
+ * once we are confident in it.
+ */
+@InterfaceAudience.Private
+public class RemoteBlockReader2 implements BlockReader {
+
+ static final Logger LOG = LoggerFactory.getLogger(RemoteBlockReader2.class);
+ static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB;
+
+ final private Peer peer;
+ final private DatanodeID datanodeID;
+ final private PeerCache peerCache;
+ final private long blockId;
+ private final ReadableByteChannel in;
+
+ private DataChecksum checksum;
+ private final PacketReceiver packetReceiver = new PacketReceiver(true);
+
+ private ByteBuffer curDataSlice = null;
+
+ /** offset in block of the last chunk received */
+ private long lastSeqNo = -1;
+
+ /** offset in block where reader wants to actually read */
+ private long startOffset;
+ private final String filename;
+
+ private final int bytesPerChecksum;
+ private final int checksumSize;
+
+ /**
+ * The total number of bytes we need to transfer from the DN.
+ * This is the amount that the user has requested plus some padding
+ * at the beginning so that the read can begin on a chunk boundary.
+ */
+ private long bytesNeededToFinish;
+
+ /**
+ * True if we are reading from a local DataNode.
+ */
+ private final boolean isLocal;
+
+ private final boolean verifyChecksum;
+
+ private boolean sentStatusCode = false;
+
+ @VisibleForTesting
+ public Peer getPeer() {
+ return peer;
+ }
+
+ @Override
+ public synchronized int read(byte[] buf, int off, int len)
+ throws IOException {
+
+ UUID randomId = null;
+ if (LOG.isTraceEnabled()) {
+ randomId = UUID.randomUUID();
+ LOG.trace(String.format("Starting read #%s file %s from datanode %s",
+ randomId.toString(), this.filename,
+ this.datanodeID.getHostName()));
+ }
+
+ if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+ TraceScope scope = Trace.startSpan(
+ "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
+ try {
+ readNextPacket();
+ } finally {
+ scope.close();
+ }
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(String.format("Finishing read #" + randomId));
+ }
+
+ if (curDataSlice.remaining() == 0) {
+ // we're at EOF now
+ return -1;
+ }
+
+ int nRead = Math.min(curDataSlice.remaining(), len);
+ curDataSlice.get(buf, off, nRead);
+
+ return nRead;
+ }
+
+
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+ if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+ TraceScope scope = Trace.startSpan(
+ "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
+ try {
+ readNextPacket();
+ } finally {
+ scope.close();
+ }
+ }
+ if (curDataSlice.remaining() == 0) {
+ // we're at EOF now
+ return -1;
+ }
+
+ int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
+ ByteBuffer writeSlice = curDataSlice.duplicate();
+ writeSlice.limit(writeSlice.position() + nRead);
+ buf.put(writeSlice);
+ curDataSlice.position(writeSlice.position());
+
+ return nRead;
+ }
+
+ private void readNextPacket() throws IOException {
+ //Read packet headers.
+ packetReceiver.receiveNextPacket(in);
+
+ PacketHeader curHeader = packetReceiver.getHeader();
+ curDataSlice = packetReceiver.getDataSlice();
+ assert curDataSlice.capacity() == curHeader.getDataLen();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("DFSClient readNextPacket got header " + curHeader);
+ }
+
+ // Sanity check the lengths
+ if (!curHeader.sanityCheck(lastSeqNo)) {
+ throw new IOException("BlockReader: error in packet header " +
+ curHeader);
+ }
+
+ if (curHeader.getDataLen() > 0) {
+ int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
+ int checksumsLen = chunks * checksumSize;
+
+ assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
+ "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() +
+ " checksumsLen=" + checksumsLen;
+
+ lastSeqNo = curHeader.getSeqno();
+ if (verifyChecksum && curDataSlice.remaining() > 0) {
+ // N.B.: the checksum error offset reported here is actually
+ // relative to the start of the block, not the start of the file.
+ // This is slightly misleading, but preserves the behavior from
+ // the older BlockReader.
+ checksum.verifyChunkedSums(curDataSlice,
+ packetReceiver.getChecksumSlice(),
+ filename, curHeader.getOffsetInBlock());
+ }
+ bytesNeededToFinish -= curHeader.getDataLen();
+ }
+
+ // First packet will include some data prior to the first byte
+ // the user requested. Skip it.
+ if (curHeader.getOffsetInBlock() < startOffset) {
+ int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
+ curDataSlice.position(newPos);
+ }
+
+ // If we've now satisfied the whole client read, read one last packet
+ // header, which should be empty
+ if (bytesNeededToFinish <= 0) {
+ readTrailingEmptyPacket();
+ if (verifyChecksum) {
+ sendReadResult(Status.CHECKSUM_OK);
+ } else {
+ sendReadResult(Status.SUCCESS);
+ }
+ }
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ /* How can we make sure we don't throw a ChecksumException, at least
+ * in majority of the cases?. This one throws. */
+ long skipped = 0;
+ while (skipped < n) {
+ long needToSkip = n - skipped;
+ if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+ readNextPacket();
+ }
+ if (curDataSlice.remaining() == 0) {
+ // we're at EOF now
+ break;
+ }
+
+ int skip = (int)Math.min(curDataSlice.remaining(), needToSkip);
+ curDataSlice.position(curDataSlice.position() + skip);
+ skipped += skip;
+ }
+ return skipped;
+ }
+
+ private void readTrailingEmptyPacket() throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Reading empty packet at end of read");
+ }
+
+ packetReceiver.receiveNextPacket(in);
+
+ PacketHeader trailer = packetReceiver.getHeader();
+ if (!trailer.isLastPacketInBlock() ||
+ trailer.getDataLen() != 0) {
+ throw new IOException("Expected empty end-of-read packet! Header: " +
+ trailer);
+ }
+ }
+
+ protected RemoteBlockReader2(String file, String bpid, long blockId,
+ DataChecksum checksum, boolean verifyChecksum,
+ long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
+ DatanodeID datanodeID, PeerCache peerCache) {
+ this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
+ createSocketAddr(datanodeID.getXferAddr()));
+ // Path is used only for printing block and file information in debug
+ this.peer = peer;
+ this.datanodeID = datanodeID;
+ this.in = peer.getInputStreamChannel();
+ this.checksum = checksum;
+ this.verifyChecksum = verifyChecksum;
+ this.startOffset = Math.max( startOffset, 0 );
+ this.filename = file;
+ this.peerCache = peerCache;
+ this.blockId = blockId;
+
+ // The total number of bytes that we need to transfer from the DN is
+ // the amount that the user wants (bytesToRead), plus the padding at
+ // the beginning in order to chunk-align. Note that the DN may elect
+ // to send more than this amount if the read starts/ends mid-chunk.
+ this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+ bytesPerChecksum = this.checksum.getBytesPerChecksum();
+ checksumSize = this.checksum.getChecksumSize();
+ }
+
+
+ @Override
+ public synchronized void close() throws IOException {
+ packetReceiver.close();
+ startOffset = -1;
+ checksum = null;
+ if (peerCache != null && sentStatusCode) {
+ peerCache.put(datanodeID, peer);
+ } else {
+ peer.close();
+ }
+
+ // in will be closed when its Socket is closed.
+ }
+
+ /**
+ * When the reader reaches end of the read, it sends a status response
+ * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+ * closing our connection (which we will re-open), but won't affect
+ * data correctness.
+ */
+ void sendReadResult(Status statusCode) {
+ assert !sentStatusCode : "already sent status code to " + peer;
+ try {
+ writeReadResult(peer.getOutputStream(), statusCode);
+ sentStatusCode = true;
+ } catch (IOException e) {
+ // It's ok not to be able to send this. But something is probably wrong.
+ LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+ peer.getRemoteAddressString() + ": " + e.getMessage());
+ }
+ }
+
+ /**
+ * Serialize the actual read result on the wire.
+ */
+ static void writeReadResult(OutputStream out, Status statusCode)
+ throws IOException {
+
+ ClientReadStatusProto.newBuilder()
+ .setStatus(statusCode)
+ .build()
+ .writeDelimitedTo(out);
+
+ out.flush();
+ }
+
+ /**
+ * File name to print when accessing a block directly (from servlets)
+ * @param s Address of the block location
+ * @param poolId Block pool ID of the block
+ * @param blockId Block ID of the block
+ * @return string that has a file name for debug purposes
+ */
+ public static String getFileName(final InetSocketAddress s,
+ final String poolId, final long blockId) {
+ return s.toString() + ":" + poolId + ":" + blockId;
+ }
+
+ @Override
+ public int readAll(byte[] buf, int offset, int len) throws IOException {
+ return BlockReaderUtil.readAll(this, buf, offset, len);
+ }
+
+ @Override
+ public void readFully(byte[] buf, int off, int len) throws IOException {
+ BlockReaderUtil.readFully(this, buf, off, len);
+ }
+
+ /**
+ * Create a new BlockReader specifically to satisfy a read.
+ * This method also sends the OP_READ_BLOCK request.
+ *
+ * @param file File location
+ * @param block The block object
+ * @param blockToken The block token for security
+ * @param startOffset The read offset, relative to block head
+ * @param len The number of bytes to read
+ * @param verifyChecksum Whether to verify checksum
+ * @param clientName Client name
+ * @param peer The Peer to use
+ * @param datanodeID The DatanodeID this peer is connected to
+ * @return New BlockReader instance, or null on error.
+ */
+ public static BlockReader newBlockReader(String file,
+ ExtendedBlock block,
+ Token<BlockTokenIdentifier> blockToken,
+ long startOffset, long len,
+ boolean verifyChecksum,
+ String clientName,
+ Peer peer, DatanodeID datanodeID,
+ PeerCache peerCache,
+ CachingStrategy cachingStrategy) throws IOException {
+ // in and out will be closed when sock is closed (by the caller)
+ final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+ peer.getOutputStream()));
+ new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
+ verifyChecksum, cachingStrategy);
+
+ //
+ // Get bytes in block
+ //
+ DataInputStream in = new DataInputStream(peer.getInputStream());
+
+ BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+ PBHelperClient.vintPrefixed(in));
+ checkSuccess(status, peer, block, file);
+ ReadOpChecksumInfoProto checksumInfo =
+ status.getReadOpChecksumInfo();
+ DataChecksum checksum = DataTransferProtoUtil.fromProto(
+ checksumInfo.getChecksum());
+ //Warning when we get CHECKSUM_NULL?
+
+ // Read the first chunk offset.
+ long firstChunkOffset = checksumInfo.getChunkOffset();
+
+ if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+ firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
+ throw new IOException("BlockReader: error in first chunk offset (" +
+ firstChunkOffset + ") startOffset is " +
+ startOffset + " for file " + file);
+ }
+
+ return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
+ checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer,
+ datanodeID, peerCache);
+ }
+
+ static void checkSuccess(
+ BlockOpResponseProto status, Peer peer,
+ ExtendedBlock block, String file)
+ throws IOException {
+ String logInfo = "for OP_READ_BLOCK"
+ + ", self=" + peer.getLocalAddressString()
+ + ", remote=" + peer.getRemoteAddressString()
+ + ", for file " + file
+ + ", for pool " + block.getBlockPoolId()
+ + " block " + block.getBlockId() + "_" + block.getGenerationStamp();
+ DataTransferProtoUtil.checkBlockOpStatus(status, logInfo);
+ }
+
+ @Override
+ public int available() throws IOException {
+ // An optimistic estimate of how much data is available
+ // to us without doing network I/O.
+ return TCP_WINDOW_SIZE;
+ }
+
+ @Override
+ public boolean isLocal() {
+ return isLocal;
+ }
+
+ @Override
+ public boolean isShortCircuit() {
+ return false;
+ }
+
+ @Override
+ public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
new file mode 100644
index 0000000..c9966a7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
+import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Shorts;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Header data for each packet that goes through the read/write pipelines.
+ * Includes all of the information about the packet, excluding checksums and
+ * actual data.
+ *
+ * This data includes:
+ * - the offset in bytes into the HDFS block of the data in this packet
+ * - the sequence number of this packet in the pipeline
+ * - whether or not this is the last packet in the pipeline
+ * - the length of the data in this packet
+ * - whether or not this packet should be synced by the DNs.
+ *
+ * When serialized, this header is written out as a protocol buffer, preceded
+ * by a 4-byte integer representing the full packet length, and a 2-byte short
+ * representing the header length.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class PacketHeader {
+ private static final int MAX_PROTO_SIZE =
+ PacketHeaderProto.newBuilder()
+ .setOffsetInBlock(0)
+ .setSeqno(0)
+ .setLastPacketInBlock(false)
+ .setDataLen(0)
+ .setSyncBlock(false)
+ .build().getSerializedSize();
+ public static final int PKT_LENGTHS_LEN =
+ Ints.BYTES + Shorts.BYTES;
+ public static final int PKT_MAX_HEADER_LEN =
+ PKT_LENGTHS_LEN + MAX_PROTO_SIZE;
+
+ private int packetLen;
+ private PacketHeaderProto proto;
+
+ public PacketHeader() {
+ }
+
+ public PacketHeader(int packetLen, long offsetInBlock, long seqno,
+ boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
+ this.packetLen = packetLen;
+ Preconditions.checkArgument(packetLen >= Ints.BYTES,
+ "packet len %s should always be at least 4 bytes",
+ packetLen);
+
+ PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder()
+ .setOffsetInBlock(offsetInBlock)
+ .setSeqno(seqno)
+ .setLastPacketInBlock(lastPacketInBlock)
+ .setDataLen(dataLen);
+
+ if (syncBlock) {
+ // Only set syncBlock if it is specified.
+ // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721
+ // because it changes the length of the packet header, and BlockReceiver
+ // in that version did not support variable-length headers.
+ builder.setSyncBlock(syncBlock);
+ }
+
+ proto = builder.build();
+ }
+
+ public int getDataLen() {
+ return proto.getDataLen();
+ }
+
+ public boolean isLastPacketInBlock() {
+ return proto.getLastPacketInBlock();
+ }
+
+ public long getSeqno() {
+ return proto.getSeqno();
+ }
+
+ public long getOffsetInBlock() {
+ return proto.getOffsetInBlock();
+ }
+
+ public int getPacketLen() {
+ return packetLen;
+ }
+
+ public boolean getSyncBlock() {
+ return proto.getSyncBlock();
+ }
+
+ @Override
+ public String toString() {
+ return "PacketHeader with packetLen=" + packetLen +
+ " header data: " +
+ proto.toString();
+ }
+
+ public void setFieldsFromData(
+ int packetLen, byte[] headerData) throws InvalidProtocolBufferException {
+ this.packetLen = packetLen;
+ proto = PacketHeaderProto.parseFrom(headerData);
+ }
+
+ public void readFields(ByteBuffer buf) throws IOException {
+ packetLen = buf.getInt();
+ short protoLen = buf.getShort();
+ byte[] data = new byte[protoLen];
+ buf.get(data);
+ proto = PacketHeaderProto.parseFrom(data);
+ }
+
+ public void readFields(DataInputStream in) throws IOException {
+ this.packetLen = in.readInt();
+ short protoLen = in.readShort();
+ byte[] data = new byte[protoLen];
+ in.readFully(data);
+ proto = PacketHeaderProto.parseFrom(data);
+ }
+
+ /**
+ * @return the number of bytes necessary to write out this header,
+ * including the length-prefixing of the payload and header
+ */
+ public int getSerializedSize() {
+ return PKT_LENGTHS_LEN + proto.getSerializedSize();
+ }
+
+ /**
+ * Write the header into the buffer.
+ * This requires that PKT_HEADER_LEN bytes are available.
+ */
+ public void putInBuffer(final ByteBuffer buf) {
+ assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+ : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
+ try {
+ buf.putInt(packetLen);
+ buf.putShort((short) proto.getSerializedSize());
+ proto.writeTo(new ByteBufferOutputStream(buf));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void write(DataOutputStream out) throws IOException {
+ assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+ : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
+ out.writeInt(packetLen);
+ out.writeShort(proto.getSerializedSize());
+ proto.writeTo(out);
+ }
+
+ public byte[] getBytes() {
+ ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
+ putInBuffer(buf);
+ return buf.array();
+ }
+
+ /**
+ * Perform a sanity check on the packet, returning true if it is sane.
+ * @param lastSeqNo the previous sequence number received - we expect the current
+ * sequence number to be larger by 1.
+ */
+ public boolean sanityCheck(long lastSeqNo) {
+ // We should only have a non-positive data length for the last packet
+ if (proto.getDataLen() <= 0 && !proto.getLastPacketInBlock()) return false;
+ // The last packet should not contain data
+ if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
+ // Seqnos should always increase by 1 with each packet received
+ if (proto.getSeqno() != lastSeqNo + 1) return false;
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PacketHeader)) return false;
+ PacketHeader other = (PacketHeader)o;
+ return this.proto.equals(other.proto);
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)proto.getSeqno();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
new file mode 100644
index 0000000..c4093b1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.DirectBufferPool;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to handle reading packets one-at-a-time from the wire.
+ * These packets are used both for reading and writing data to/from
+ * DataNodes.
+ */
+@InterfaceAudience.Private
+public class PacketReceiver implements Closeable {
+
+ /**
+ * The max size of any single packet. This prevents OOMEs when
+ * invalid data is sent.
+ */
+ private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
+
+ static final Logger LOG = LoggerFactory.getLogger(PacketReceiver.class);
+
+ private static final DirectBufferPool bufferPool = new DirectBufferPool();
+ private final boolean useDirectBuffers;
+
+ /**
+ * The entirety of the most recently read packet.
+ * The first PKT_LENGTHS_LEN bytes of this buffer are the
+ * length prefixes.
+ */
+ private ByteBuffer curPacketBuf = null;
+
+ /**
+ * A slice of {@link #curPacketBuf} which contains just the checksums.
+ */
+ private ByteBuffer curChecksumSlice = null;
+
+ /**
+ * A slice of {@link #curPacketBuf} which contains just the data.
+ */
+ private ByteBuffer curDataSlice = null;
+
+ /**
+ * The packet header of the most recently read packet.
+ */
+ private PacketHeader curHeader;
+
+ public PacketReceiver(boolean useDirectBuffers) {
+ this.useDirectBuffers = useDirectBuffers;
+ reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN);
+ }
+
+ public PacketHeader getHeader() {
+ return curHeader;
+ }
+
+ public ByteBuffer getDataSlice() {
+ return curDataSlice;
+ }
+
+ public ByteBuffer getChecksumSlice() {
+ return curChecksumSlice;
+ }
+
+ /**
+ * Reads all of the data for the next packet into the appropriate buffers.
+ *
+ * The data slice and checksum slice members will be set to point to the
+ * user data and corresponding checksums. The header will be parsed and
+ * set.
+ */
+ public void receiveNextPacket(ReadableByteChannel in) throws IOException {
+ doRead(in, null);
+ }
+
+ /**
+ * @see #receiveNextPacket(ReadableByteChannel)
+ */
+ public void receiveNextPacket(InputStream in) throws IOException {
+ doRead(null, in);
+ }
+
+ private void doRead(ReadableByteChannel ch, InputStream in)
+ throws IOException {
+ // Each packet looks like:
+ // PLEN HLEN HEADER CHECKSUMS DATA
+ // 32-bit 16-bit <protobuf> <variable length>
+ //
+ // PLEN: Payload length
+ // = length(PLEN) + length(CHECKSUMS) + length(DATA)
+ // This length includes its own encoded length in
+ // the sum for historical reasons.
+ //
+ // HLEN: Header length
+ // = length(HEADER)
+ //
+ // HEADER: the actual packet header fields, encoded in protobuf
+ // CHECKSUMS: the crcs for the data chunk. May be missing if
+ // checksums were not requested
+ // DATA the actual block data
+ Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
+
+ curPacketBuf.clear();
+ curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN);
+ doReadFully(ch, in, curPacketBuf);
+ curPacketBuf.flip();
+ int payloadLen = curPacketBuf.getInt();
+
+ if (payloadLen < Ints.BYTES) {
+ // The "payload length" includes its own length. Therefore it
+ // should never be less than 4 bytes
+ throw new IOException("Invalid payload length " +
+ payloadLen);
+ }
+ int dataPlusChecksumLen = payloadLen - Ints.BYTES;
+ int headerLen = curPacketBuf.getShort();
+ if (headerLen < 0) {
+ throw new IOException("Invalid header length " + headerLen);
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("readNextPacket: dataPlusChecksumLen = " + dataPlusChecksumLen +
+ " headerLen = " + headerLen);
+ }
+
+ // Sanity check the buffer size so we don't allocate too much memory
+ // and OOME.
+ int totalLen = payloadLen + headerLen;
+ if (totalLen < 0 || totalLen > MAX_PACKET_SIZE) {
+ throw new IOException("Incorrect value for packet payload size: " +
+ payloadLen);
+ }
+
+ // Make sure we have space for the whole packet, and
+ // read it.
+ reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN +
+ dataPlusChecksumLen + headerLen);
+ curPacketBuf.clear();
+ curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
+ curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN +
+ dataPlusChecksumLen + headerLen);
+ doReadFully(ch, in, curPacketBuf);
+ curPacketBuf.flip();
+ curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
+
+ // Extract the header from the front of the buffer (after the length prefixes)
+ byte[] headerBuf = new byte[headerLen];
+ curPacketBuf.get(headerBuf);
+ if (curHeader == null) {
+ curHeader = new PacketHeader();
+ }
+ curHeader.setFieldsFromData(payloadLen, headerBuf);
+
+ // Compute the sub-slices of the packet
+ int checksumLen = dataPlusChecksumLen - curHeader.getDataLen();
+ if (checksumLen < 0) {
+ throw new IOException("Invalid packet: data length in packet header " +
+ "exceeds data length received. dataPlusChecksumLen=" +
+ dataPlusChecksumLen + " header: " + curHeader);
+ }
+
+ reslicePacket(headerLen, checksumLen, curHeader.getDataLen());
+ }
+
+ /**
+ * Rewrite the last-read packet on the wire to the given output stream.
+ */
+ public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
+ Preconditions.checkState(!useDirectBuffers,
+ "Currently only supported for non-direct buffers");
+ mirrorOut.write(curPacketBuf.array(),
+ curPacketBuf.arrayOffset(),
+ curPacketBuf.remaining());
+ }
+
+
+ private static void doReadFully(ReadableByteChannel ch, InputStream in,
+ ByteBuffer buf) throws IOException {
+ if (ch != null) {
+ readChannelFully(ch, buf);
+ } else {
+ Preconditions.checkState(!buf.isDirect(),
+ "Must not use direct buffers with InputStream API");
+ IOUtils.readFully(in, buf.array(),
+ buf.arrayOffset() + buf.position(),
+ buf.remaining());
+ buf.position(buf.position() + buf.remaining());
+ }
+ }
+
+ private void reslicePacket(
+ int headerLen, int checksumsLen, int dataLen) {
+ // Packet structure (refer to doRead() for details):
+ // PLEN HLEN HEADER CHECKSUMS DATA
+ // 32-bit 16-bit <protobuf> <variable length>
+ // |--- lenThroughHeader ----|
+ // |----------- lenThroughChecksums ----|
+ // |------------------- lenThroughData ------|
+ int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen;
+ int lenThroughChecksums = lenThroughHeader + checksumsLen;
+ int lenThroughData = lenThroughChecksums + dataLen;
+
+ assert dataLen >= 0 : "invalid datalen: " + dataLen;
+ assert curPacketBuf.position() == lenThroughHeader;
+ assert curPacketBuf.limit() == lenThroughData :
+ "headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
+ " rem=" + curPacketBuf.remaining();
+
+ // Slice the checksums.
+ curPacketBuf.position(lenThroughHeader);
+ curPacketBuf.limit(lenThroughChecksums);
+ curChecksumSlice = curPacketBuf.slice();
+
+ // Slice the data.
+ curPacketBuf.position(lenThroughChecksums);
+ curPacketBuf.limit(lenThroughData);
+ curDataSlice = curPacketBuf.slice();
+
+ // Reset buffer to point to the entirety of the packet (including
+ // length prefixes)
+ curPacketBuf.position(0);
+ curPacketBuf.limit(lenThroughData);
+ }
+
+
+ private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
+ throws IOException {
+ while (buf.remaining() > 0) {
+ int n = ch.read(buf);
+ if (n < 0) {
+ throw new IOException("Premature EOF reading from " + ch);
+ }
+ }
+ }
+
+ private void reallocPacketBuf(int atLeastCapacity) {
+ // Realloc the buffer if this packet is longer than the previous
+ // one.
+ if (curPacketBuf == null ||
+ curPacketBuf.capacity() < atLeastCapacity) {
+ ByteBuffer newBuf;
+ if (useDirectBuffers) {
+ newBuf = bufferPool.getBuffer(atLeastCapacity);
+ } else {
+ newBuf = ByteBuffer.allocate(atLeastCapacity);
+ }
+ // If reallocing an existing buffer, copy the old packet length
+ // prefixes over
+ if (curPacketBuf != null) {
+ curPacketBuf.flip();
+ newBuf.put(curPacketBuf);
+ }
+
+ returnPacketBufToPool();
+ curPacketBuf = newBuf;
+ }
+ }
+
+ private void returnPacketBufToPool() {
+ if (curPacketBuf != null && curPacketBuf.isDirect()) {
+ bufferPool.returnBuffer(curPacketBuf);
+ curPacketBuf = null;
+ }
+ }
+
+ @Override // Closeable
+ public void close() {
+ returnPacketBufToPool();
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ try {
+ // just in case it didn't get closed, we
+ // may as well still try to return the buffer
+ returnPacketBufToPool();
+ } finally {
+ super.finalize();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
new file mode 100644
index 0000000..31d4dcc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * OutputStream that writes into a {@link ByteBuffer}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class ByteBufferOutputStream extends OutputStream {
+
+ private final ByteBuffer buf;
+
+ public ByteBufferOutputStream(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ buf.put((byte)b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ buf.put(b, off, len);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 7b5979e..ef8fac5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -867,6 +867,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8980. Remove unnecessary block replacement in INodeFile. (jing9)
+ HDFS-8990. Move RemoteBlockReader to hdfs-client module.
+ (Mingliang via wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 3c49ef7..268a5b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -203,7 +203,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
DataEncryptionKeyFactory {
public static final Log LOG = LogFactory.getLog(DFSClient.class);
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
- static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
private final Configuration conf;
private final DfsClientConf dfsClientConf;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
deleted file mode 100644
index 015e154..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumSet;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FSInputChecker;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.htrace.Sampler;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-
-
-/**
- * @deprecated this is an old implementation that is being left around
- * in case any issues spring up with the new {@link RemoteBlockReader2} implementation.
- * It will be removed in the next release.
- */
-@InterfaceAudience.Private
-@Deprecated
-public class RemoteBlockReader extends FSInputChecker implements BlockReader {
- private final Peer peer;
- private final DatanodeID datanodeID;
- private final DataInputStream in;
- private DataChecksum checksum;
-
- /** offset in block of the last chunk received */
- private long lastChunkOffset = -1;
- private long lastChunkLen = -1;
- private long lastSeqNo = -1;
-
- /** offset in block where reader wants to actually read */
- private long startOffset;
-
- private final long blockId;
-
- /** offset in block of of first chunk - may be less than startOffset
- if startOffset is not chunk-aligned */
- private final long firstChunkOffset;
-
- private final int bytesPerChecksum;
- private final int checksumSize;
-
- /**
- * The total number of bytes we need to transfer from the DN.
- * This is the amount that the user has requested plus some padding
- * at the beginning so that the read can begin on a chunk boundary.
- */
- private final long bytesNeededToFinish;
-
- /**
- * True if we are reading from a local DataNode.
- */
- private final boolean isLocal;
-
- private boolean eos = false;
- private boolean sentStatusCode = false;
-
- ByteBuffer checksumBytes = null;
- /** Amount of unread data in the current received packet */
- int dataLeft = 0;
-
- private final PeerCache peerCache;
-
- /* FSInputChecker interface */
-
- /* same interface as inputStream java.io.InputStream#read()
- * used by DFSInputStream#read()
- * This violates one rule when there is a checksum error:
- * "Read should not modify user buffer before successful read"
- * because it first reads the data to user buffer and then checks
- * the checksum.
- */
- @Override
- public synchronized int read(byte[] buf, int off, int len)
- throws IOException {
-
- // This has to be set here, *before* the skip, since we can
- // hit EOS during the skip, in the case that our entire read
- // is smaller than the checksum chunk.
- boolean eosBefore = eos;
-
- //for the first read, skip the extra bytes at the front.
- if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
- // Skip these bytes. But don't call this.skip()!
- int toSkip = (int)(startOffset - firstChunkOffset);
- if ( super.readAndDiscard(toSkip) != toSkip ) {
- // should never happen
- throw new IOException("Could not skip required number of bytes");
- }
- }
-
- int nRead = super.read(buf, off, len);
-
- // if eos was set in the previous read, send a status code to the DN
- if (eos && !eosBefore && nRead >= 0) {
- if (needChecksum()) {
- sendReadResult(peer, Status.CHECKSUM_OK);
- } else {
- sendReadResult(peer, Status.SUCCESS);
- }
- }
- return nRead;
- }
-
- @Override
- public synchronized long skip(long n) throws IOException {
- /* How can we make sure we don't throw a ChecksumException, at least
- * in majority of the cases?. This one throws. */
- long nSkipped = 0;
- while (nSkipped < n) {
- int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE);
- int ret = readAndDiscard(toSkip);
- if (ret <= 0) {
- return nSkipped;
- }
- nSkipped += ret;
- }
- return nSkipped;
- }
-
- @Override
- public int read() throws IOException {
- throw new IOException("read() is not expected to be invoked. " +
- "Use read(buf, off, len) instead.");
- }
-
- @Override
- public boolean seekToNewSource(long targetPos) throws IOException {
- /* Checksum errors are handled outside the BlockReader.
- * DFSInputStream does not always call 'seekToNewSource'. In the
- * case of pread(), it just tries a different replica without seeking.
- */
- return false;
- }
-
- @Override
- public void seek(long pos) throws IOException {
- throw new IOException("Seek() is not supported in BlockInputChecker");
- }
-
- @Override
- protected long getChunkPosition(long pos) {
- throw new RuntimeException("getChunkPosition() is not supported, " +
- "since seek is not required");
- }
-
- /**
- * Makes sure that checksumBytes has enough capacity
- * and limit is set to the number of checksum bytes needed
- * to be read.
- */
- private void adjustChecksumBytes(int dataLen) {
- int requiredSize =
- ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
- if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
- checksumBytes = ByteBuffer.wrap(new byte[requiredSize]);
- } else {
- checksumBytes.clear();
- }
- checksumBytes.limit(requiredSize);
- }
-
- @Override
- protected synchronized int readChunk(long pos, byte[] buf, int offset,
- int len, byte[] checksumBuf)
- throws IOException {
- TraceScope scope =
- Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")",
- Sampler.NEVER);
- try {
- return readChunkImpl(pos, buf, offset, len, checksumBuf);
- } finally {
- scope.close();
- }
- }
-
- private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
- int len, byte[] checksumBuf)
- throws IOException {
- // Read one chunk.
- if (eos) {
- // Already hit EOF
- return -1;
- }
-
- // Read one DATA_CHUNK.
- long chunkOffset = lastChunkOffset;
- if ( lastChunkLen > 0 ) {
- chunkOffset += lastChunkLen;
- }
-
- // pos is relative to the start of the first chunk of the read.
- // chunkOffset is relative to the start of the block.
- // This makes sure that the read passed from FSInputChecker is the
- // for the same chunk we expect to be reading from the DN.
- if ( (pos + firstChunkOffset) != chunkOffset ) {
- throw new IOException("Mismatch in pos : " + pos + " + " +
- firstChunkOffset + " != " + chunkOffset);
- }
-
- // Read next packet if the previous packet has been read completely.
- if (dataLeft <= 0) {
- //Read packet headers.
- PacketHeader header = new PacketHeader();
- header.readFields(in);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("DFSClient readChunk got header " + header);
- }
-
- // Sanity check the lengths
- if (!header.sanityCheck(lastSeqNo)) {
- throw new IOException("BlockReader: error in packet header " +
- header);
- }
-
- lastSeqNo = header.getSeqno();
- dataLeft = header.getDataLen();
- adjustChecksumBytes(header.getDataLen());
- if (header.getDataLen() > 0) {
- IOUtils.readFully(in, checksumBytes.array(), 0,
- checksumBytes.limit());
- }
- }
-
- // Sanity checks
- assert len >= bytesPerChecksum;
- assert checksum != null;
- assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
-
-
- int checksumsToRead, bytesToRead;
-
- if (checksumSize > 0) {
-
- // How many chunks left in our packet - this is a ceiling
- // since we may have a partial chunk at the end of the file
- int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
-
- // How many chunks we can fit in databuffer
- // - note this is a floor since we always read full chunks
- int chunksCanFit = Math.min(len / bytesPerChecksum,
- checksumBuf.length / checksumSize);
-
- // How many chunks should we read
- checksumsToRead = Math.min(chunksLeft, chunksCanFit);
- // How many bytes should we actually read
- bytesToRead = Math.min(
- checksumsToRead * bytesPerChecksum, // full chunks
- dataLeft); // in case we have a partial
- } else {
- // no checksum
- bytesToRead = Math.min(dataLeft, len);
- checksumsToRead = 0;
- }
-
- if ( bytesToRead > 0 ) {
- // Assert we have enough space
- assert bytesToRead <= len;
- assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
- assert checksumBuf.length >= checksumSize * checksumsToRead;
- IOUtils.readFully(in, buf, offset, bytesToRead);
- checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
- }
-
- dataLeft -= bytesToRead;
- assert dataLeft >= 0;
-
- lastChunkOffset = chunkOffset;
- lastChunkLen = bytesToRead;
-
- // If there's no data left in the current packet after satisfying
- // this read, and we have satisfied the client read, we expect
- // an empty packet header from the DN to signify this.
- // Note that pos + bytesToRead may in fact be greater since the
- // DN finishes off the entire last chunk.
- if (dataLeft == 0 &&
- pos + bytesToRead >= bytesNeededToFinish) {
-
- // Read header
- PacketHeader hdr = new PacketHeader();
- hdr.readFields(in);
-
- if (!hdr.isLastPacketInBlock() ||
- hdr.getDataLen() != 0) {
- throw new IOException("Expected empty end-of-read packet! Header: " +
- hdr);
- }
-
- eos = true;
- }
-
- if ( bytesToRead == 0 ) {
- return -1;
- }
-
- return bytesToRead;
- }
-
- private RemoteBlockReader(String file, String bpid, long blockId,
- DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
- long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
- DatanodeID datanodeID, PeerCache peerCache) {
- // Path is used only for printing block and file information in debug
- super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId +
- ":" + bpid + ":of:"+ file)/*too non path-like?*/,
- 1, verifyChecksum,
- checksum.getChecksumSize() > 0? checksum : null,
- checksum.getBytesPerChecksum(),
- checksum.getChecksumSize());
-
- this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
- createSocketAddr(datanodeID.getXferAddr()));
-
- this.peer = peer;
- this.datanodeID = datanodeID;
- this.in = in;
- this.checksum = checksum;
- this.startOffset = Math.max( startOffset, 0 );
- this.blockId = blockId;
-
- // The total number of bytes that we need to transfer from the DN is
- // the amount that the user wants (bytesToRead), plus the padding at
- // the beginning in order to chunk-align. Note that the DN may elect
- // to send more than this amount if the read starts/ends mid-chunk.
- this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-
- this.firstChunkOffset = firstChunkOffset;
- lastChunkOffset = firstChunkOffset;
- lastChunkLen = -1;
-
- bytesPerChecksum = this.checksum.getBytesPerChecksum();
- checksumSize = this.checksum.getChecksumSize();
- this.peerCache = peerCache;
- }
-
- /**
- * Create a new BlockReader specifically to satisfy a read.
- * This method also sends the OP_READ_BLOCK request.
- *
- * @param file File location
- * @param block The block object
- * @param blockToken The block token for security
- * @param startOffset The read offset, relative to block head
- * @param len The number of bytes to read
- * @param bufferSize The IO buffer size (not the client buffer size)
- * @param verifyChecksum Whether to verify checksum
- * @param clientName Client name
- * @return New BlockReader instance, or null on error.
- */
- public static RemoteBlockReader newBlockReader(String file,
- ExtendedBlock block,
- Token<BlockTokenIdentifier> blockToken,
- long startOffset, long len,
- int bufferSize, boolean verifyChecksum,
- String clientName, Peer peer,
- DatanodeID datanodeID,
- PeerCache peerCache,
- CachingStrategy cachingStrategy)
- throws IOException {
- // in and out will be closed when sock is closed (by the caller)
- final DataOutputStream out =
- new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
- new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
- verifyChecksum, cachingStrategy);
-
- //
- // Get bytes in block, set streams
- //
-
- DataInputStream in = new DataInputStream(
- new BufferedInputStream(peer.getInputStream(), bufferSize));
-
- BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
- PBHelperClient.vintPrefixed(in));
- RemoteBlockReader2.checkSuccess(status, peer, block, file);
- ReadOpChecksumInfoProto checksumInfo =
- status.getReadOpChecksumInfo();
- DataChecksum checksum = DataTransferProtoUtil.fromProto(
- checksumInfo.getChecksum());
- //Warning when we get CHECKSUM_NULL?
-
- // Read the first chunk offset.
- long firstChunkOffset = checksumInfo.getChunkOffset();
-
- if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
- firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
- throw new IOException("BlockReader: error in first chunk offset (" +
- firstChunkOffset + ") startOffset is " +
- startOffset + " for file " + file);
- }
-
- return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
- in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
- peer, datanodeID, peerCache);
- }
-
- @Override
- public synchronized void close() throws IOException {
- startOffset = -1;
- checksum = null;
- if (peerCache != null & sentStatusCode) {
- peerCache.put(datanodeID, peer);
- } else {
- peer.close();
- }
-
- // in will be closed when its Socket is closed.
- }
-
- @Override
- public void readFully(byte[] buf, int readOffset, int amtToRead)
- throws IOException {
- IOUtils.readFully(this, buf, readOffset, amtToRead);
- }
-
- @Override
- public int readAll(byte[] buf, int offset, int len) throws IOException {
- return readFully(this, buf, offset, len);
- }
-
- /**
- * When the reader reaches end of the read, it sends a status response
- * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
- * closing our connection (which we will re-open), but won't affect
- * data correctness.
- */
- void sendReadResult(Peer peer, Status statusCode) {
- assert !sentStatusCode : "already sent status code to " + peer;
- try {
- RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
- sentStatusCode = true;
- } catch (IOException e) {
- // It's ok not to be able to send this. But something is probably wrong.
- LOG.info("Could not send read status (" + statusCode + ") to datanode " +
- peer.getRemoteAddressString() + ": " + e.getMessage());
- }
- }
-
- @Override
- public int read(ByteBuffer buf) throws IOException {
- throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
- }
-
- @Override
- public int available() throws IOException {
- // An optimistic estimate of how much data is available
- // to us without doing network I/O.
- return DFSClient.TCP_WINDOW_SIZE;
- }
-
- @Override
- public boolean isLocal() {
- return isLocal;
- }
-
- @Override
- public boolean isShortCircuit() {
- return false;
- }
-
- @Override
- public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
- return null;
- }
-}