You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sh...@apache.org on 2012/04/23 23:37:56 UTC
svn commit: r1329468 [1/2] - in /hadoop/common/branches/branch-0.22/hdfs: ./
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/common/
src/java/org/apache/hadoop/hdfs/server/datanode/ src/...
Author: shv
Date: Mon Apr 23 21:37:55 2012
New Revision: 1329468
URL: http://svn.apache.org/viewvc?rev=1329468&view=rev
Log:
HDFS-2246. Enable reading a block directly from local file system for a client on the same node as the block file. Contributed by Andrew Purtell, Suresh, Jitendra and Benoy
Added:
hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (with props)
hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (with props)
hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java (with props)
hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (with props)
Modified:
hadoop/common/branches/branch-0.22/hdfs/CHANGES.txt
hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/common/branches/branch-0.22/hdfs/src/test/commit-tests
hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java
hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
Modified: hadoop/common/branches/branch-0.22/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/CHANGES.txt?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.22/hdfs/CHANGES.txt Mon Apr 23 21:37:55 2012
@@ -16,6 +16,10 @@ Release 0.22.1 - Unreleased
HDFS-1601. Pipeline ACKs are sent as lots of tiny TCP packets (todd)
+ HDFS-2246. Enable reading a block directly from local file system
+ for a client on the same node as the block file. (Andrew Purtell,
+ Suresh Srinivas, Jitendra Nath Pandey and Benoy Antony via shv)
+
BUG FIXES
HDFS-1910. NameNode should not save fsimage twice. (shv)
Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java Mon Apr 23 21:37:55 2012
@@ -17,507 +17,30 @@
*/
package org.apache.hadoop.hdfs;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.Closeable;
import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
import java.net.Socket;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FSInputChecker;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-
-/** This is a wrapper around connection to datanode
- * and understands checksum, offset etc.
- *
- * Terminology:
- * <dl>
- * <dt>block</dt>
- * <dd>The hdfs block, typically large (~64MB).
- * </dd>
- * <dt>chunk</dt>
- * <dd>A block is divided into chunks, each comes with a checksum.
- * We want transfers to be chunk-aligned, to be able to
- * verify checksums.
- * </dd>
- * <dt>packet</dt>
- * <dd>A grouping of chunks used for transport. It contains a
- * header, followed by checksum data, followed by real data.
- * </dd>
- * </dl>
- * Please see DataNode for the RPC specification.
- */
-@InterfaceAudience.Private
-public class BlockReader extends FSInputChecker {
-
- Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
- private DataInputStream in;
- private DataChecksum checksum;
-
- /** offset in block of the last chunk received */
- private long lastChunkOffset = -1;
- private long lastChunkLen = -1;
- private long lastSeqNo = -1;
-
- /** offset in block where reader wants to actually read */
- private long startOffset;
- /** offset in block of of first chunk - may be less than startOffset
- if startOffset is not chunk-aligned */
- private final long firstChunkOffset;
-
- private int bytesPerChecksum;
- private int checksumSize;
/**
- * The total number of bytes we need to transfer from the DN.
- * This is the amount that the user has requested plus some padding
- * at the beginning so that the read can begin on a chunk boundary.
- */
- private final long bytesNeededToFinish;
-
- private boolean eos = false;
- private boolean sentStatusCode = false;
-
- byte[] skipBuf = null;
- ByteBuffer checksumBytes = null;
- /** Amount of unread data in the current received packet */
- int dataLeft = 0;
-
- /* FSInputChecker interface */
-
- /* same interface as inputStream java.io.InputStream#read()
- * used by DFSInputStream#read()
- * This violates one rule when there is a checksum error:
- * "Read should not modify user buffer before successful read"
- * because it first reads the data to user buffer and then checks
- * the checksum.
+ * The API shared between local and remote block readers.
*/
- @Override
- public synchronized int read(byte[] buf, int off, int len)
- throws IOException {
-
- // This has to be set here, *before* the skip, since we can
- // hit EOS during the skip, in the case that our entire read
- // is smaller than the checksum chunk.
- boolean eosBefore = eos;
+public interface BlockReader extends Closeable {
- //for the first read, skip the extra bytes at the front.
- if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
- // Skip these bytes. But don't call this.skip()!
- int toSkip = (int)(startOffset - firstChunkOffset);
- if ( skipBuf == null ) {
- skipBuf = new byte[bytesPerChecksum];
- }
- if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
- // should never happen
- throw new IOException("Could not skip required number of bytes");
- }
- }
-
- int nRead = super.read(buf, off, len);
+ public int read(byte buf[], int off, int len) throws IOException;
- // if eos was set in the previous read, send a status code to the DN
- if (eos && !eosBefore && nRead >= 0) {
- if (needChecksum()) {
- sendReadResult(dnSock, CHECKSUM_OK);
- } else {
- sendReadResult(dnSock, SUCCESS);
- }
- }
- return nRead;
- }
+ public int readAll(byte[] buf, int offset, int len) throws IOException;
- @Override
- public synchronized long skip(long n) throws IOException {
- /* How can we make sure we don't throw a ChecksumException, at least
- * in majority of the cases?. This one throws. */
- if ( skipBuf == null ) {
- skipBuf = new byte[bytesPerChecksum];
- }
-
- long nSkipped = 0;
- while ( nSkipped < n ) {
- int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
- int ret = read(skipBuf, 0, toSkip);
- if ( ret <= 0 ) {
- return nSkipped;
- }
- nSkipped += ret;
- }
- return nSkipped;
- }
-
- @Override
- public int read() throws IOException {
- throw new IOException("read() is not expected to be invoked. " +
- "Use read(buf, off, len) instead.");
- }
-
- @Override
- public boolean seekToNewSource(long targetPos) throws IOException {
- /* Checksum errors are handled outside the BlockReader.
- * DFSInputStream does not always call 'seekToNewSource'. In the
- * case of pread(), it just tries a different replica without seeking.
- */
- return false;
- }
-
- @Override
- public void seek(long pos) throws IOException {
- throw new IOException("Seek() is not supported in BlockInputChecker");
- }
-
- @Override
- protected long getChunkPosition(long pos) {
- throw new RuntimeException("getChunkPosition() is not supported, " +
- "since seek is not required");
- }
-
- /**
- * Makes sure that checksumBytes has enough capacity
- * and limit is set to the number of checksum bytes needed
- * to be read.
- */
- private void adjustChecksumBytes(int dataLen) {
- int requiredSize =
- ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
- if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
- checksumBytes = ByteBuffer.wrap(new byte[requiredSize]);
- } else {
- checksumBytes.clear();
- }
- checksumBytes.limit(requiredSize);
- }
-
- @Override
- protected synchronized int readChunk(long pos, byte[] buf, int offset,
- int len, byte[] checksumBuf)
- throws IOException {
- // Read one chunk.
- if (eos) {
- // Already hit EOF
- return -1;
- }
-
- // Read one DATA_CHUNK.
- long chunkOffset = lastChunkOffset;
- if ( lastChunkLen > 0 ) {
- chunkOffset += lastChunkLen;
- }
-
- // pos is relative to the start of the first chunk of the read.
- // chunkOffset is relative to the start of the block.
- // This makes sure that the read passed from FSInputChecker is the
- // for the same chunk we expect to be reading from the DN.
- if ( (pos + firstChunkOffset) != chunkOffset ) {
- throw new IOException("Mismatch in pos : " + pos + " + " +
- firstChunkOffset + " != " + chunkOffset);
- }
-
- // Read next packet if the previous packet has been read completely.
- if (dataLeft <= 0) {
- //Read packet headers.
- PacketHeader header = new PacketHeader();
- header.readFields(in);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("DFSClient readChunk got header " + header);
- }
-
- // Sanity check the lengths
- if (!header.sanityCheck(lastSeqNo)) {
- throw new IOException("BlockReader: error in packet header " +
- header);
- }
-
- lastSeqNo = header.getSeqno();
- dataLeft = header.getDataLen();
- adjustChecksumBytes(header.getDataLen());
- if (header.getDataLen() > 0) {
- IOUtils.readFully(in, checksumBytes.array(), 0,
- checksumBytes.limit());
- }
- }
-
- // Sanity checks
- assert len >= bytesPerChecksum;
- assert checksum != null;
- assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
-
-
- int checksumsToRead, bytesToRead;
-
- if (checksumSize > 0) {
-
- // How many chunks left in our packet - this is a ceiling
- // since we may have a partial chunk at the end of the file
- int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
-
- // How many chunks we can fit in databuffer
- // - note this is a floor since we always read full chunks
- int chunksCanFit = Math.min(len / bytesPerChecksum,
- checksumBuf.length / checksumSize);
-
- // How many chunks should we read
- checksumsToRead = Math.min(chunksLeft, chunksCanFit);
- // How many bytes should we actually read
- bytesToRead = Math.min(
- checksumsToRead * bytesPerChecksum, // full chunks
- dataLeft); // in case we have a partial
- } else {
- // no checksum
- bytesToRead = Math.min(dataLeft, len);
- checksumsToRead = 0;
- }
-
- if ( bytesToRead > 0 ) {
- // Assert we have enough space
- assert bytesToRead <= len;
- assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
- assert checksumBuf.length >= checksumSize * checksumsToRead;
- IOUtils.readFully(in, buf, offset, bytesToRead);
- checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
- }
-
- dataLeft -= bytesToRead;
- assert dataLeft >= 0;
-
- lastChunkOffset = chunkOffset;
- lastChunkLen = bytesToRead;
-
- // If there's no data left in the current packet after satisfying
- // this read, and we have satisfied the client read, we expect
- // an empty packet header from the DN to signify this.
- // Note that pos + bytesToRead may in fact be greater since the
- // DN finishes off the entire last chunk.
- if (dataLeft == 0 &&
- pos + bytesToRead >= bytesNeededToFinish) {
-
- // Read header
- int packetLen = in.readInt();
- long offsetInBlock = in.readLong();
- long seqno = in.readLong();
- boolean lastPacketInBlock = in.readBoolean();
- int dataLen = in.readInt();
-
- if (!lastPacketInBlock ||
- dataLen != 0) {
- throw new IOException("Expected empty end-of-read packet! Header: " +
- "(packetLen : " + packetLen +
- ", offsetInBlock : " + offsetInBlock +
- ", seqno : " + seqno +
- ", lastInBlock : " + lastPacketInBlock +
- ", dataLen : " + dataLen);
- }
-
- eos = true;
- }
-
- if ( bytesToRead == 0 ) {
- return -1;
- }
-
- return bytesToRead;
- }
-
- private BlockReader( String file, long blockId, DataInputStream in,
- DataChecksum checksum, boolean verifyChecksum,
- long startOffset, long firstChunkOffset,
- long bytesToRead,
- Socket dnSock ) {
- // Path is used only for printing block and file information in debug
- super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
- 1, verifyChecksum,
- checksum.getChecksumSize() > 0? checksum : null,
- checksum.getBytesPerChecksum(),
- checksum.getChecksumSize());
-
- this.dnSock = dnSock;
- this.in = in;
- this.checksum = checksum;
- this.startOffset = Math.max( startOffset, 0 );
-
- // The total number of bytes that we need to transfer from the DN is
- // the amount that the user wants (bytesToRead), plus the padding at
- // the beginning in order to chunk-align. Note that the DN may elect
- // to send more than this amount if the read starts/ends mid-chunk.
- this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-
- this.firstChunkOffset = firstChunkOffset;
- lastChunkOffset = firstChunkOffset;
- lastChunkLen = -1;
-
- bytesPerChecksum = this.checksum.getBytesPerChecksum();
- checksumSize = this.checksum.getChecksumSize();
- }
-
- public static BlockReader newBlockReader(Socket sock, String file,
- Block block, Token<BlockTokenIdentifier> blockToken,
- long startOffset, long len, int bufferSize) throws IOException {
- return newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize,
- true);
- }
-
- /** Java Doc required */
- public static BlockReader newBlockReader( Socket sock, String file,
- Block block,
- Token<BlockTokenIdentifier> blockToken,
- long startOffset, long len,
- int bufferSize, boolean verifyChecksum)
- throws IOException {
- return newBlockReader(sock, file, block, blockToken, startOffset,
- len, bufferSize, verifyChecksum, "");
- }
-
- /**
- * Create a new BlockReader specifically to satisfy a read.
- * This method also sends the OP_READ_BLOCK request.
- *
- * @param sock An established Socket to the DN. The BlockReader will not close it normally
- * @param file File location
- * @param block The block object
- * @param blockToken The block token for security
- * @param startOffset The read offset, relative to block head
- * @param len The number of bytes to read
- * @param bufferSize The IO buffer size (not the client buffer size)
- * @param verifyChecksum Whether to verify checksum
- * @param clientName Client name
- * @return New BlockReader instance, or null on error.
- */
- public static BlockReader newBlockReader( Socket sock, String file,
- Block block,
- Token<BlockTokenIdentifier> blockToken,
- long startOffset, long len,
- int bufferSize, boolean verifyChecksum,
- String clientName)
- throws IOException {
- // in and out will be closed when sock is closed (by the caller)
- DataTransferProtocol.Sender.opReadBlock(
- new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
- block, startOffset, len, clientName, blockToken);
-
- //
- // Get bytes in block, set streams
- //
-
- DataInputStream in = new DataInputStream(
- new BufferedInputStream(NetUtils.getInputStream(sock),
- bufferSize));
-
- DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
- if (status != SUCCESS) {
- if (status == ERROR_ACCESS_TOKEN) {
- throw new InvalidBlockTokenException(
- "Got access token error for OP_READ_BLOCK, self="
- + sock.getLocalSocketAddress() + ", remote="
- + sock.getRemoteSocketAddress() + ", for file " + file
- + ", for block " + block.getBlockId()
- + "_" + block.getGenerationStamp());
- } else {
- throw new IOException("Got error for OP_READ_BLOCK, self="
- + sock.getLocalSocketAddress() + ", remote="
- + sock.getRemoteSocketAddress() + ", for file " + file
- + ", for block " + block.getBlockId() + "_"
- + block.getGenerationStamp());
- }
- }
- DataChecksum checksum = DataChecksum.newDataChecksum( in );
- //Warning when we get CHECKSUM_NULL?
-
- // Read the first chunk offset.
- long firstChunkOffset = in.readLong();
-
- if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
- firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
- throw new IOException("BlockReader: error in first chunk offset (" +
- firstChunkOffset + ") startOffset is " +
- startOffset + " for file " + file);
- }
-
- return new BlockReader(file, block.getBlockId(), in, checksum,
- verifyChecksum, startOffset, firstChunkOffset, len, sock);
- }
-
- @Override
- public synchronized void close() throws IOException {
- startOffset = -1;
- checksum = null;
- if (dnSock != null) {
- dnSock.close();
- }
-
- // in will be closed when its Socket is closed.
- }
-
- /** kind of like readFully(). Only reads as much as possible.
- * And allows use of protected readFully().
- */
- public int readAll(byte[] buf, int offset, int len) throws IOException {
- return readFully(this, buf, offset, len);
- }
+ public long skip(long n) throws IOException;
/**
* Take the socket used to talk to the DN.
*/
- public Socket takeSocket() {
- assert hasSentStatusCode() :
- "BlockReader shouldn't give back sockets mid-read";
- Socket res = dnSock;
- dnSock = null;
- return res;
- }
+ Socket takeSocket();
/**
* Whether the BlockReader has reached the end of its input stream
* and successfully sent a status code back to the datanode.
*/
- public boolean hasSentStatusCode() {
- return sentStatusCode;
- }
-
- /**
- * When the reader reaches end of the read, it sends a status response
- * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
- * closing our connection (which we will re-open), but won't affect
- * data correctness.
- */
- void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
- assert !sentStatusCode : "already sent status code to " + sock;
- try {
- OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
- statusCode.writeOutputStream(out);
- out.flush();
- sentStatusCode = true;
- } catch (IOException e) {
- // It's ok not to be able to send this. But something is probably wrong.
- LOG.info("Could not send read status (" + statusCode + ") to datanode " +
- sock.getInetAddress() + ": " + e.getMessage());
- }
- }
-
- // File name to print when accessing a block directory from servlets
- public static String getFileName(final InetSocketAddress s,
- final long blockId) {
- return s.toString() + ":" + blockId;
- }
+ boolean hasSentStatusCode();
}
Added: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1329468&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (added)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Mon Apr 23 21:37:55 2012
@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+/**
+ * BlockReaderLocal enables local short circuited reads. If the DFS client is on
+ * the same machine as the datanode, then the client can read files directly
+ * from the local file system rather than going through the datanode for better
+ * performance. <br>
+ * {@link BlockReaderLocal} works as follows:
+ * <ul>
+ * <li>The client performing short circuit reads must be configured at the
+ * datanode.</li>
+ * <li>The client gets the path to the file where block is stored using
+ * {@link ClientDatanodeProtocol#getBlockLocalPathInfo(Block, Token)} RPC call</li>
+ * <li>Client uses kerberos authentication to connect to the datanode over RPC,
+ * if security is enabled.</li>
+ * </ul>
+ */
+class BlockReaderLocal extends FSInputChecker implements BlockReader {
+ public static final Log LOG = LogFactory.getLog(BlockReaderLocal.class);
+
+
+ private DataChecksum checksum;
+ private int bytesPerChecksum;
+ private int checksumSize;
+ private long firstChunkOffset;
+ private long lastChunkLen = -1;
+ private long lastChunkOffset = -1;
+ private long startOffset;
+ private boolean eos = false;
+ private byte[] skipBuf = null;
+
+
+
+ //Stores the cache and proxy for a local datanode.
+ private static class LocalDatanodeInfo {
+ private ClientDatanodeProtocol proxy = null;
+ private final Map<Block, BlockLocalPathInfo> cache;
+
+ LocalDatanodeInfo() {
+ final int cacheSize = 10000;
+ final float hashTableLoadFactor = 0.75f;
+ int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
+ cache = Collections
+ .synchronizedMap(new LinkedHashMap<Block, BlockLocalPathInfo>(
+ hashTableCapacity, hashTableLoadFactor, true) {
+ private static final long serialVersionUID = 1;
+
+ @Override
+ protected boolean removeEldestEntry(
+ Map.Entry<Block, BlockLocalPathInfo> eldest) {
+ return size() > cacheSize;
+ }
+ });
+ }
+
+ private synchronized ClientDatanodeProtocol getDatanodeProxy(
+ DatanodeInfo node, Configuration conf, int socketTimeout)
+ throws IOException {
+ if (proxy == null) {
+ proxy = DFSClient.createClientDatanodeProtocolProxy(node, conf,
+ socketTimeout);
+ }
+ return proxy;
+ }
+
+ private synchronized void resetDatanodeProxy() {
+ if (null != proxy) {
+ RPC.stopProxy(proxy);
+ proxy = null;
+ }
+ }
+
+ private BlockLocalPathInfo getBlockLocalPathInfo(Block b) {
+ return cache.get(b);
+ }
+
+ private void setBlockLocalPathInfo(Block b, BlockLocalPathInfo info) {
+ cache.put(b, info);
+ }
+
+ private void removeBlockLocalPathInfo(Block b) {
+ cache.remove(b);
+ }
+ }
+
+ // Multiple datanodes could be running on the local machine. Store proxies in
+ // a map keyed by the ipc port of the datanode.
+ private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+
+ private FileInputStream dataIn; // reader for the data file
+ private FileInputStream checksumIn; // reader for the checksum file
+
+ /**
+ * The only way this object can be instantiated.
+ */
+ static BlockReaderLocal newBlockReader(Configuration conf,
+ String file, Block blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
+ int socketTimeout, long startOffset, long length) throws IOException {
+
+ LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
+ // check the cache first
+ BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
+ if (pathinfo == null) {
+ pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
+ }
+
+ // check to see if the file exists. It may so happen that the
+ // HDFS file has been deleted and this block-lookup is occurring
+ // on behalf of a new HDFS file. This time, the block file could
+ // be residing in a different portion of the fs.data.dir directory.
+ // In this case, we remove this entry from the cache. The next
+ // call to this method will re-populate the cache.
+ FileInputStream dataIn = null;
+ FileInputStream checksumIn = null;
+ BlockReaderLocal localBlockReader = null;
+ boolean skipChecksum = skipChecksumCheck(conf);
+ try {
+ // get a local file system
+ File blkfile = new File(pathinfo.getBlockPath());
+ dataIn = new FileInputStream(blkfile);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
+ + blkfile.length() + " startOffset " + startOffset + " length "
+ + length + " short circuit checksum " + skipChecksum);
+ }
+
+ if (!skipChecksum) {
+ // get the metadata file
+ File metafile = new File(pathinfo.getMetaPath());
+ checksumIn = new FileInputStream(metafile);
+
+ // read and handle the common header here. For now just a version
+ BlockMetadataHeader header = BlockMetadataHeader
+ .readHeader(new DataInputStream(checksumIn));
+ short version = header.getVersion();
+ if (version != FSDataset.METADATA_VERSION) {
+ LOG.warn("Wrong version (" + version + ") for metadata file for "
+ + blk + " ignoring ...");
+ }
+ DataChecksum checksum = header.getChecksum();
+ localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length,
+ pathinfo, checksum, true, dataIn, checksumIn);
+ } else {
+ localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length,
+ pathinfo, dataIn);
+ }
+ } catch (IOException e) {
+ // remove from cache
+ localDatanodeInfo.removeBlockLocalPathInfo(blk);
+ DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk +
+ " from cache because local file " + pathinfo.getBlockPath() +
+ " could not be opened.");
+ throw e;
+ } finally {
+ if (localBlockReader == null) {
+ if (dataIn != null) {
+ dataIn.close();
+ }
+ if (checksumIn != null) {
+ checksumIn.close();
+ }
+ }
+ }
+ return localBlockReader;
+ }
+
+ private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
+ LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
+ if (ldInfo == null) {
+ ldInfo = new LocalDatanodeInfo();
+ localDatanodeInfoMap.put(port, ldInfo);
+ }
+ return ldInfo;
+ }
+
+ private static BlockLocalPathInfo getBlockPathInfo(Block blk,
+ DatanodeInfo node, Configuration conf, int timeout,
+ Token<BlockTokenIdentifier> token) throws IOException {
+ LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort);
+ BlockLocalPathInfo pathinfo = null;
+ ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
+ conf, timeout);
+ try {
+ // make RPC to local datanode to find local pathnames of blocks
+ pathinfo = proxy.getBlockLocalPathInfo(blk, token);
+ if (pathinfo != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cached location of block " + blk + " as " + pathinfo);
+ }
+ localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
+ }
+ } catch (IOException e) {
+ localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
+ throw e;
+ }
+ return pathinfo;
+ }
+
+ private static boolean skipChecksumCheck(Configuration conf) {
+ return conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+ }
+
+ private BlockReaderLocal(Configuration conf, String hdfsfile, Block block,
+ Token<BlockTokenIdentifier> token, long startOffset, long length,
+ BlockLocalPathInfo pathinfo, FileInputStream dataIn) throws IOException {
+ super(
+ new Path("/blk_" + block.getBlockId() + ":of:" + hdfsfile) /*too non path-like?*/,
+ 1);
+ this.startOffset = startOffset;
+ this.dataIn = dataIn;
+ long toSkip = startOffset;
+ while (toSkip > 0) {
+ long skipped = dataIn.skip(toSkip);
+ if (skipped == 0) {
+ throw new IOException("Couldn't initialize input stream");
+ }
+ toSkip -= skipped;
+ }
+ }
+
+ private BlockReaderLocal(Configuration conf, String hdfsfile, Block block,
+ Token<BlockTokenIdentifier> token, long startOffset, long length,
+ BlockLocalPathInfo pathinfo, DataChecksum checksum, boolean verifyChecksum,
+ FileInputStream dataIn, FileInputStream checksumIn) throws IOException {
+ super(
+ new Path("/blk_" + block.getBlockId() + ":of:" + hdfsfile) /*too non path-like?*/,
+ 1,
+ verifyChecksum,
+ checksum.getChecksumSize() > 0? checksum : null,
+ checksum.getBytesPerChecksum(),
+ checksum.getChecksumSize());
+ this.startOffset = startOffset;
+ this.dataIn = dataIn;
+ this.checksumIn = checksumIn;
+ this.checksum = checksum;
+
+ long blockLength = pathinfo.getNumBytes();
+
+ /* If bytesPerChecksum is very large, then the metadata file
+ * is mostly corrupted. For now just truncate bytesPerchecksum to
+ * blockLength.
+ */
+ bytesPerChecksum = checksum.getBytesPerChecksum();
+ if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
+ checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
+ Math.max((int) blockLength, 10 * 1024 * 1024));
+ bytesPerChecksum = checksum.getBytesPerChecksum();
+ }
+
+ checksumSize = checksum.getChecksumSize();
+
+ if (startOffset < 0 || startOffset > blockLength
+ || (length + startOffset) > blockLength) {
+ String msg = " Offset " + startOffset + " and length " + length
+ + " don't match block " + block + " ( blockLen " + blockLength + " )";
+ LOG.warn("BlockReaderLocal requested with incorrect offset: " + msg);
+ throw new IOException(msg);
+ }
+
+ firstChunkOffset = (startOffset - (startOffset % bytesPerChecksum));
+
+ if (firstChunkOffset > 0) {
+ dataIn.getChannel().position(firstChunkOffset);
+
+ long checksumSkip = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+ if (checksumSkip > 0) {
+ checksumIn.skip(checksumSkip);
+ }
+ }
+
+ lastChunkOffset = firstChunkOffset;
+ lastChunkLen = -1;
+ }
+
+ @Override
+ public synchronized int read(byte[] buf, int off, int len) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("read off " + off + " len " + len);
+ }
+ if (checksum == null) {
+ return dataIn.read(buf, off, len);
+ }
+ // For the first read, skip the extra bytes at the front.
+ if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
+ // Skip these bytes. But don't call this.skip()!
+ int toSkip = (int)(startOffset - firstChunkOffset);
+ if (skipBuf == null) {
+ skipBuf = new byte[bytesPerChecksum];
+ }
+ if (super.read(skipBuf, 0, toSkip) != toSkip) {
+ // Should never happen
+ throw new IOException("Could not skip " + toSkip + " bytes");
+ }
+ }
+ return super.read(buf, off, len);
+ }
+
+ @Override
+ public int readAll(byte[] buf, int offset, int len) throws IOException {
+ return readFully(this, buf, offset, len);
+ }
+
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("skip " + n);
+ }
+ if (checksum == null) {
+ return dataIn.skip(n);
+ }
+ // Skip by reading the data so we stay in sync with checksums.
+ // This could be implemented more efficiently in the future to
+ // skip to the beginning of the appropriate checksum chunk
+ // and then only read to the middle of that chunk.
+ if (skipBuf == null) {
+ skipBuf = new byte[bytesPerChecksum];
+ }
+ long nSkipped = 0;
+ while (nSkipped < n) {
+ int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
+ int ret = read(skipBuf, 0, toSkip);
+ if (ret <= 0) {
+ return nSkipped;
+ }
+ nSkipped += ret;
+ }
+ return nSkipped;
+ }
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ // Checksum errors are handled outside BlockReaderLocal
+ return false;
+ }
+
+ @Override
+ protected long getChunkPosition(long pos) {
+ throw new RuntimeException("getChunkPosition() is not supported, " +
+ "since seek is not implemented");
+ }
+
+ @Override
+ public synchronized void seek(long n) throws IOException {
+ throw new IOException("Seek() is not supported in BlockReaderLocal");
+ }
+
+ @Override
+ protected synchronized int readChunk(long pos, byte[] buf, int offset,
+ int len, byte[] checksumBuf) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reading chunk from position " + pos + " at offset " +
+ offset + " with length " + len);
+ }
+
+ if (eos) {
+ startOffset = -1;
+ return -1;
+ }
+
+ if ((pos + firstChunkOffset) != lastChunkOffset) {
+ throw new IOException("Mismatch in pos : " + pos + " + "
+ + firstChunkOffset + " != " + lastChunkOffset);
+ }
+
+ int checksumsToRead, bytesToRead;
+ int nRead = 0;
+
+ if (checksumIn != null) {
+
+ // How many chunks we can fit in databuffer and checksum Buffer
+ int chunksCanFit = Math.min(len / bytesPerChecksum,checksumBuf.length / checksumSize);
+
+ //compute the bytes to read
+ bytesToRead = chunksCanFit * bytesPerChecksum;
+
+ nRead = dataIn.read(buf, offset, bytesToRead);
+
+ //now compute the number of checksums to read
+ checksumsToRead = Math.min(((nRead-1)/bytesPerChecksum) + 1 , chunksCanFit);
+
+ int nChecksumRead = checksumIn.read(checksumBuf, 0, checksumSize * checksumsToRead);
+
+ if (nChecksumRead != checksumSize * checksumsToRead) {
+ throw new IOException("Could not read checksum at offset " +
+ checksumIn.getChannel().position() + " from the meta file.");
+ }
+ }
+ else {
+ nRead = dataIn.read(buf, offset, len);
+ }
+
+
+ if (nRead < bytesPerChecksum) {
+ eos = true;
+ }
+
+ lastChunkOffset += nRead;
+ lastChunkLen = nRead;
+
+ return nRead;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ IOUtils.closeStream(dataIn);
+ IOUtils.closeStream(checksumIn);
+ }
+
+ @Override
+ public Socket takeSocket() {
+ return null;
+ }
+
+ @Override
+ public boolean hasSentStatusCode() {
+ return false;
+ }
+}
\ No newline at end of file
Propchange: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java Mon Apr 23 21:37:55 2012
@@ -29,14 +29,20 @@ import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
import java.net.Socket;
+import java.net.SocketException;
import java.net.SocketTimeoutException;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
@@ -78,6 +84,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -96,7 +103,6 @@ import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -138,6 +144,7 @@ public class DFSClient implements FSCons
int socketTimeout;
final int writePacketSize;
final FileSystem.Statistics stats;
+ boolean shortCircuitLocalReads;
final int hdfsTimeout; // timeout value for a DFS operation.
final SocketCache socketCache;
@@ -218,6 +225,21 @@ public class DFSClient implements FSCons
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
}
+ /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
+ static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
+ DatanodeID datanodeid, Configuration conf, int socketTimeout)
+ throws IOException {
+ InetSocketAddress addr = NetUtils.createSocketAddr(
+ datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+ if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
+ ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
+ }
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
+ ClientDatanodeProtocol.versionID, addr, ugi, conf, NetUtils
+ .getDefaultSocketFactory(conf), socketTimeout);
+ }
+
/**
* Same as this(NameNode.getAddress(conf), conf);
* @see #DFSClient(InetSocketAddress, Configuration)
@@ -290,6 +312,13 @@ public class DFSClient implements FSCons
"Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
+ "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
}
+ // read directly from the block file if configured.
+ this.shortCircuitLocalReads = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Short circuit read is " + shortCircuitLocalReads);
+ }
}
/**
@@ -399,7 +428,6 @@ public class DFSClient implements FSCons
}
}
-
/**
* @see ClientProtocol#getDelegationToken(Text)
*/
@@ -425,6 +453,38 @@ public class DFSClient implements FSCons
}
}
+ private static Set<String> localIpAddresses = Collections
+ .synchronizedSet(new HashSet<String>());
+
+ static boolean isLocalAddress(InetSocketAddress targetAddr) {
+ InetAddress addr = targetAddr.getAddress();
+ if (localIpAddresses.contains(addr.getHostAddress())) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Address " + targetAddr + " is local");
+ }
+ return true;
+ }
+
+ // Check if the address is any local or loop back
+ boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
+
+ // Check if the address is defined on any interface
+ if (!local) {
+ try {
+ local = NetworkInterface.getByInetAddress(addr) != null;
+ } catch (SocketException e) {
+ local = false;
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Address " + targetAddr + " is local");
+ }
+ if (local == true) {
+ localIpAddresses.add(addr.getHostAddress());
+ }
+ return local;
+ }
+
/**
* @see ClientProtocol#cancelDelegationToken(Token)
*/
@@ -1555,4 +1615,29 @@ public class DFSClient implements FSCons
return getClass().getSimpleName() + "[clientName=" + clientName
+ ", ugi=" + ugi + "]";
}
+
+ boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr)
+ throws IOException {
+ if (shortCircuitLocalReads && DFSClient.isLocalAddress(targetAddr)) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Get {@link BlockReader} for short circuited local reads.
+ */
+ BlockReader getLocalBlockReader(
+ String src, Block blk, Token<BlockTokenIdentifier> accessToken,
+ DatanodeInfo chosenNode, long offsetIntoBlock)
+ throws InvalidToken, IOException {
+ try {
+ return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
+ chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
+ - offsetIntoBlock);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(InvalidToken.class,
+ AccessControlException.class);
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Mon Apr 23 21:37:55 2012
@@ -207,6 +207,10 @@ public class DFSConfigKeys extends Commo
public static final long DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 21600000;
public static final String DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";
public static final int DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
+ public static final String DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit";
+ public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
+ public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
+ public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
// property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@@ -237,4 +241,5 @@ public class DFSConfigKeys extends Commo
public static final String DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY = "dfs.secondary.namenode.kerberos.https.principal";
public static final String DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
public static final int DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
+ public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
}
Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Mon Apr 23 21:37:55 2012
@@ -35,12 +35,14 @@ import org.apache.hadoop.hdfs.protocol.C
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
@@ -383,9 +385,33 @@ public class DFSInputStream extends FSIn
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
- try {
+ // try getting a local blockReader. if this fails, then go via
+ // the datanode
Block blk = targetBlock.getBlock();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
+ if (dfsClient.shouldTryShortCircuitRead(targetAddr)) {
+ try {
+ blockReader = dfsClient.getLocalBlockReader( src, blk, accessToken,
+ chosenNode, offsetIntoBlock);
+ return chosenNode;
+ } catch (AccessControlException ex) {
+ DFSClient.LOG.warn("Short circuit access failed ", ex);
+ //Disable short circuit reads
+ dfsClient.shortCircuitLocalReads = false;
+ } catch (IOException ex) {
+ if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
+ /* Get a new access token and retry. */
+ refetchToken--;
+ fetchBlockAt(target);
+ continue;
+ } else {
+ DFSClient.LOG.info("Failed to read block " + targetBlock.getBlock()
+ + " on local machine" + StringUtils.stringifyException(ex));
+ DFSClient.LOG.info("Try reading via the datanode on " + targetAddr);
+ }
+ }
+ }
+ try {
blockReader = getBlockReader(
targetAddr, src, blk,
@@ -394,20 +420,7 @@ public class DFSInputStream extends FSIn
buffersize, verifyChecksum, dfsClient.clientName);
return chosenNode;
} catch (IOException ex) {
- if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
- DFSClient.LOG.info("Will fetch a new access token and retry, "
- + "access token was invalid when connecting to " + targetAddr
- + " : " + ex);
- /*
- * Get a new access token and retry. Retry is needed in 2 cases. 1)
- * When both NN and DN re-started while DFSClient holding a cached
- * access token. 2) In the case that NN fails to update its
- * access key at pre-set interval (by a wide margin) and
- * subsequently restarts. In this case, DN re-registers itself with
- * NN and receives a new access key, but DN will delete the old
- * access key from its memory since it's considered expired based on
- * the estimated expiration date.
- */
+ if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
refetchToken--;
fetchBlockAt(target);
} else {
@@ -610,14 +623,27 @@ public class DFSInputStream extends FSIn
try {
Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
-
int len = (int) (end - start + 1);
+ if (dfsClient.shouldTryShortCircuitRead(targetAddr)) {
+ try {
+ reader = dfsClient.getLocalBlockReader( src, block.getBlock(),
+ blockToken, chosenNode, start);
+ } catch (AccessControlException ex) {
+ DFSClient.LOG.warn("Short circuit access failed ", ex);
+ //Disable short circuit reads
+ dfsClient.shortCircuitLocalReads = false;
+ continue;
+ }
+ } else {
+ // go to the datanode
reader = getBlockReader(targetAddr, src,
block.getBlock(),
blockToken,
start, len, buffersize,
verifyChecksum, dfsClient.clientName);
+ }
+
int nread = reader.readAll(buf, offset, len);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
@@ -630,10 +656,7 @@ public class DFSInputStream extends FSIn
e.getPos() + " from " + chosenNode.getName());
dfsClient.reportChecksumFailure(src, block.getBlock(), chosenNode);
} catch (IOException e) {
- if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
- DFSClient.LOG.info("Will get a new access token and retry, "
- + "access token was invalid when connecting to " + targetAddr
- + " : " + e);
+ if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
refetchToken--;
fetchBlockAt(block.getStartOffset());
continue;
@@ -723,7 +746,7 @@ public class DFSInputStream extends FSIn
try {
// The OP_READ_BLOCK request is sent as we make the BlockReader
BlockReader reader =
- BlockReader.newBlockReader(sock, file, block,
+ RemoteBlockReader.newBlockReader(sock, file, block,
blockToken,
startOffset, len,
bufferSize, verifyChecksum,
@@ -740,7 +763,6 @@ public class DFSInputStream extends FSIn
throw err;
}
-
/**
* Read bytes starting from the specified position.
*
@@ -914,6 +936,33 @@ public class DFSInputStream extends FSIn
}
/**
+ * Should the block access token be refetched on an exception
+ *
+ * @param ex Exception received
+ * @param targetAddr Target datanode address from where exception was received
+ * @return true if block access token has expired or invalid and it should be
+ * refetched
+ */
+ private static boolean tokenRefetchNeeded(IOException ex,
+ InetSocketAddress targetAddr) {
+ /*
+ * Get a new access token and retry. Retry is needed in 2 cases. 1) When
+ * both NN and DN re-started while DFSClient holding a cached access token.
+ * 2) In the case that NN fails to update its access key at pre-set interval
+ * (by a wide margin) and subsequently restarts. In this case, DN
+ * re-registers itself with NN and receives a new access key, but DN will
+ * delete the old access key from its memory since it's considered expired
+ * based on the estimated expiration date.
+ */
+ if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) {
+ DFSClient.LOG.info("Access token was invalid when connecting to " + targetAddr
+ + " : " + ex);
+ return true;
+ }
+ return false;
+ }
+
+ /**
* Pick the best node from which to stream the data.
* Entries in <i>nodes</i> are already in the priority order
*/
Added: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1329468&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (added)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Mon Apr 23 21:37:55 2012
@@ -0,0 +1,542 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+/** This is a wrapper around connection to datanode
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ * <dd>The hdfs block, typically large (~64MB).
+ * </dd>
+ * <dt>chunk</dt>
+ * <dd>A block is divided into chunks, each comes with a checksum.
+ * We want transfers to be chunk-aligned, to be able to
+ * verify checksums.
+ * </dd>
+ * <dt>packet</dt>
+ * <dd>A grouping of chunks used for transport. It contains a
+ * header, followed by checksum data, followed by real data.
+ * </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
+ */
+@InterfaceAudience.Private
+public class RemoteBlockReader extends FSInputChecker implements BlockReader {
+ public static final Log LOG = LogFactory.getLog(RemoteBlockReader.class);
+
+ Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+ private DataInputStream in;
+ private DataChecksum checksum;
+
+ /** offset in block of the last chunk received */
+ private long lastChunkOffset = -1;
+ private long lastChunkLen = -1;
+ private long lastSeqNo = -1;
+
+ /** offset in block where reader wants to actually read */
+ private long startOffset;
+
+ /** offset in block of of first chunk - may be less than startOffset
+ if startOffset is not chunk-aligned */
+ private long firstChunkOffset;
+
+ private int bytesPerChecksum;
+ private int checksumSize;
+
+ /**
+ * The total number of bytes we need to transfer from the DN.
+ * This is the amount that the user has requested plus some padding
+ * at the beginning so that the read can begin on a chunk boundary.
+ */
+ private long bytesNeededToFinish;
+
+ private boolean eos = false;
+ private boolean sentStatusCode = false;
+
+ byte[] skipBuf = null;
+ ByteBuffer checksumBytes = null;
+ /** Amount of unread data in the current received packet */
+ int dataLeft = 0;
+
+ /* FSInputChecker interface */
+
+ /* same interface as inputStream java.io.InputStream#read()
+ * used by DFSInputStream#read()
+ * This violates one rule when there is a checksum error:
+ * "Read should not modify user buffer before successful read"
+ * because it first reads the data to user buffer and then checks
+ * the checksum.
+ */
+ @Override
+ public synchronized int read(byte[] buf, int off, int len)
+ throws IOException {
+
+ // This has to be set here, *before* the skip, since we can
+ // hit EOS during the skip, in the case that our entire read
+ // is smaller than the checksum chunk.
+ boolean eosBefore = eos;
+
+ //for the first read, skip the extra bytes at the front.
+ if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
+ // Skip these bytes. But don't call this.skip()!
+ int toSkip = (int)(startOffset - firstChunkOffset);
+ if ( skipBuf == null ) {
+ skipBuf = new byte[bytesPerChecksum];
+ }
+ if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
+ // should never happen
+ throw new IOException("Could not skip required number of bytes");
+ }
+ }
+
+ int nRead = super.read(buf, off, len);
+
+ // if eos was set in the previous read, send a status code to the DN
+ if ( dnSock != null && eos && !eosBefore && nRead >= 0) {
+ if (needChecksum()) {
+ sendReadResult(dnSock, CHECKSUM_OK);
+ } else {
+ sendReadResult(dnSock, SUCCESS);
+ }
+ }
+ return nRead;
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ /* How can we make sure we don't throw a ChecksumException, at least
+ * in majority of the cases?. This one throws. */
+ if ( skipBuf == null ) {
+ skipBuf = new byte[bytesPerChecksum];
+ }
+
+ long nSkipped = 0;
+ while ( nSkipped < n ) {
+ int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
+ int ret = read(skipBuf, 0, toSkip);
+ if ( ret <= 0 ) {
+ return nSkipped;
+ }
+ nSkipped += ret;
+ }
+ return nSkipped;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new IOException("read() is not expected to be invoked. " +
+ "Use read(buf, off, len) instead.");
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ /* Checksum errors are handled outside the BlockReader.
+ * DFSInputStream does not always call 'seekToNewSource'. In the
+ * case of pread(), it just tries a different replica without seeking.
+ */
+ return false;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ throw new IOException("Seek() is not supported in BlockInputChecker");
+ }
+
+ @Override
+ protected long getChunkPosition(long pos) {
+ throw new RuntimeException("getChunkPosition() is not supported, " +
+ "since seek is not required");
+ }
+
+ /**
+ * Makes sure that checksumBytes has enough capacity
+ * and limit is set to the number of checksum bytes needed
+ * to be read.
+ */
+ private void adjustChecksumBytes(int dataLen) {
+ int requiredSize =
+ ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
+ if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
+ checksumBytes = ByteBuffer.wrap(new byte[requiredSize]);
+ } else {
+ checksumBytes.clear();
+ }
+ checksumBytes.limit(requiredSize);
+ }
+
+ @Override
+ protected synchronized int readChunk(long pos, byte[] buf, int offset,
+ int len, byte[] checksumBuf)
+ throws IOException {
+ // Read one chunk.
+ if (eos) {
+ // Already hit EOF
+ return -1;
+ }
+
+ // Read one DATA_CHUNK.
+ long chunkOffset = lastChunkOffset;
+ if ( lastChunkLen > 0 ) {
+ chunkOffset += lastChunkLen;
+ }
+
+ // pos is relative to the start of the first chunk of the read.
+ // chunkOffset is relative to the start of the block.
+ // This makes sure that the read passed from FSInputChecker is the
+ // for the same chunk we expect to be reading from the DN.
+ if ( (pos + firstChunkOffset) != chunkOffset ) {
+ throw new IOException("Mismatch in pos : " + pos + " + " +
+ firstChunkOffset + " != " + chunkOffset);
+ }
+
+ // Read next packet if the previous packet has been read completely.
+ if (dataLeft <= 0) {
+ //Read packet headers.
+ PacketHeader header = new PacketHeader();
+ header.readFields(in);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DFSClient readChunk got header " + header);
+ }
+
+ // Sanity check the lengths
+ if (!header.sanityCheck(lastSeqNo)) {
+ throw new IOException("BlockReader: error in packet header " +
+ header);
+ }
+
+ lastSeqNo = header.getSeqno();
+ dataLeft = header.getDataLen();
+ adjustChecksumBytes(header.getDataLen());
+ if (header.getDataLen() > 0) {
+ IOUtils.readFully(in, checksumBytes.array(), 0,
+ checksumBytes.limit());
+ }
+ }
+
+ // Sanity checks
+ assert len >= bytesPerChecksum;
+ assert checksum != null;
+ assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
+
+
+ int checksumsToRead, bytesToRead;
+
+ if (checksumSize > 0) {
+
+ // How many chunks left in our packet - this is a ceiling
+ // since we may have a partial chunk at the end of the file
+ int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
+
+ // How many chunks we can fit in databuffer
+ // - note this is a floor since we always read full chunks
+ int chunksCanFit = Math.min(len / bytesPerChecksum,
+ checksumBuf.length / checksumSize);
+
+ // How many chunks should we read
+ checksumsToRead = Math.min(chunksLeft, chunksCanFit);
+ // How many bytes should we actually read
+ bytesToRead = Math.min(
+ checksumsToRead * bytesPerChecksum, // full chunks
+ dataLeft); // in case we have a partial
+ } else {
+ // no checksum
+ bytesToRead = Math.min(dataLeft, len);
+ checksumsToRead = 0;
+ }
+
+ if ( bytesToRead > 0 ) {
+ // Assert we have enough space
+ assert bytesToRead <= len;
+ assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
+ assert checksumBuf.length >= checksumSize * checksumsToRead;
+ IOUtils.readFully(in, buf, offset, bytesToRead);
+ checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
+ }
+
+ dataLeft -= bytesToRead;
+ assert dataLeft >= 0;
+
+ lastChunkOffset = chunkOffset;
+ lastChunkLen = bytesToRead;
+
+ // If there's no data left in the current packet after satisfying
+ // this read, and we have satisfied the client read, we expect
+ // an empty packet header from the DN to signify this.
+ // Note that pos + bytesToRead may in fact be greater since the
+ // DN finishes off the entire last chunk.
+ if (dataLeft == 0 &&
+ pos + bytesToRead >= bytesNeededToFinish) {
+
+ // Read header
+ int packetLen = in.readInt();
+ long offsetInBlock = in.readLong();
+ long seqno = in.readLong();
+ boolean lastPacketInBlock = in.readBoolean();
+ int dataLen = in.readInt();
+
+ if (!lastPacketInBlock ||
+ dataLen != 0) {
+ throw new IOException("Expected empty end-of-read packet! Header: " +
+ "(packetLen : " + packetLen +
+ ", offsetInBlock : " + offsetInBlock +
+ ", seqno : " + seqno +
+ ", lastInBlock : " + lastPacketInBlock +
+ ", dataLen : " + dataLen);
+ }
+
+ eos = true;
+ }
+
+ if ( bytesToRead == 0 ) {
+ return -1;
+ }
+
+ return bytesToRead;
+ }
+
+ private RemoteBlockReader( String file, long blockId, DataInputStream in,
+ DataChecksum checksum, boolean verifyChecksum,
+ long startOffset, long firstChunkOffset,
+ long bytesToRead,
+ Socket dnSock ) {
+ // Path is used only for printing block and file information in debug
+ super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
+ 1, verifyChecksum,
+ checksum.getChecksumSize() > 0? checksum : null,
+ checksum.getBytesPerChecksum(),
+ checksum.getChecksumSize());
+
+ this.dnSock = dnSock;
+ this.in = in;
+ this.checksum = checksum;
+ this.startOffset = Math.max( startOffset, 0 );
+
+ // The total number of bytes that we need to transfer from the DN is
+ // the amount that the user wants (bytesToRead), plus the padding at
+ // the beginning in order to chunk-align. Note that the DN may elect
+ // to send more than this amount if the read starts/ends mid-chunk.
+ this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+
+ this.firstChunkOffset = firstChunkOffset;
+ lastChunkOffset = firstChunkOffset;
+ lastChunkLen = -1;
+
+ bytesPerChecksum = this.checksum.getBytesPerChecksum();
+ checksumSize = this.checksum.getChecksumSize();
+ }
+ /**
+ * Public constructor
+ */
+ RemoteBlockReader(Path file, int numRetries) {
+ super(file, numRetries);
+ }
+
+ protected RemoteBlockReader(Path file, int numRetries, DataChecksum checksum,
+ boolean verifyChecksum) {
+ super(file,
+ numRetries,
+ verifyChecksum,
+ checksum.getChecksumSize() > 0? checksum : null,
+ checksum.getBytesPerChecksum(),
+ checksum.getChecksumSize());
+ }
+
+ public static RemoteBlockReader newBlockReader(Socket sock, String file,
+ Block block, Token<BlockTokenIdentifier> blockToken,
+ long startOffset, long len, int bufferSize) throws IOException {
+ return newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize,
+ true);
+ }
+
+ /** Java Doc required */
+ public static RemoteBlockReader newBlockReader( Socket sock, String file,
+ Block block,
+ Token<BlockTokenIdentifier> blockToken,
+ long startOffset, long len,
+ int bufferSize, boolean verifyChecksum)
+ throws IOException {
+ return newBlockReader(sock, file, block, blockToken, startOffset,
+ len, bufferSize, verifyChecksum, "");
+ }
+
+ /**
+ * Create a new BlockReader specifically to satisfy a read.
+ * This method also sends the OP_READ_BLOCK request.
+ *
+ * @param sock An established Socket to the DN. The BlockReader will not close it normally
+ * @param file File location
+ * @param block The block object
+ * @param blockToken The block token for security
+ * @param startOffset The read offset, relative to block head
+ * @param len The number of bytes to read
+ * @param bufferSize The IO buffer size (not the client buffer size)
+ * @param verifyChecksum Whether to verify checksum
+ * @param clientName Client name
+ * @return New BlockReader instance, or null on error.
+ */
+ public static RemoteBlockReader newBlockReader( Socket sock, String file,
+ Block block,
+ Token<BlockTokenIdentifier> blockToken,
+ long startOffset, long len,
+ int bufferSize, boolean verifyChecksum,
+ String clientName)
+ throws IOException {
+ // in and out will be closed when sock is closed (by the caller)
+ DataTransferProtocol.Sender.opReadBlock(
+ new DataOutputStream(new BufferedOutputStream(
+ NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
+ block, startOffset, len, clientName, blockToken);
+
+ //
+ // Get bytes in block, set streams
+ //
+
+ DataInputStream in = new DataInputStream(
+ new BufferedInputStream(NetUtils.getInputStream(sock),
+ bufferSize));
+
+ DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
+ if (status != SUCCESS) {
+ if (status == ERROR_ACCESS_TOKEN) {
+ throw new InvalidBlockTokenException(
+ "Got access token error for OP_READ_BLOCK, self="
+ + sock.getLocalSocketAddress() + ", remote="
+ + sock.getRemoteSocketAddress() + ", for file " + file
+ + ", for block " + block.getBlockId()
+ + "_" + block.getGenerationStamp());
+ } else {
+ throw new IOException("Got error for OP_READ_BLOCK, self="
+ + sock.getLocalSocketAddress() + ", remote="
+ + sock.getRemoteSocketAddress() + ", for file " + file
+ + ", for block " + block.getBlockId() + "_"
+ + block.getGenerationStamp());
+ }
+ }
+ DataChecksum checksum = DataChecksum.newDataChecksum( in );
+ //Warning when we get CHECKSUM_NULL?
+
+ // Read the first chunk offset.
+ long firstChunkOffset = in.readLong();
+
+ if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+ firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
+ throw new IOException("BlockReader: error in first chunk offset (" +
+ firstChunkOffset + ") startOffset is " +
+ startOffset + " for file " + file);
+ }
+
+ return new RemoteBlockReader(file, block.getBlockId(), in, checksum,
+ verifyChecksum, startOffset, firstChunkOffset, len, sock);
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ startOffset = -1;
+ checksum = null;
+ if (dnSock != null) {
+ dnSock.close();
+ }
+
+ // in will be closed when its Socket is closed.
+ }
+
+ /** kind of like readFully(). Only reads as much as possible.
+ * And allows use of protected readFully().
+ */
+ public int readAll(byte[] buf, int offset, int len) throws IOException {
+ return readFully(this, buf, offset, len);
+ }
+
+ /**
+ * Take the socket used to talk to the DN.
+ */
+ public Socket takeSocket() {
+ assert hasSentStatusCode() :
+ "BlockReader shouldn't give back sockets mid-read";
+ Socket res = dnSock;
+ dnSock = null;
+ return res;
+ }
+
+ /**
+ * Whether the BlockReader has reached the end of its input stream
+ * and successfully sent a status code back to the datanode.
+ */
+ public boolean hasSentStatusCode() {
+ return sentStatusCode;
+ }
+
+ /**
+ * When the reader reaches end of the read, it sends a status response
+ * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+ * closing our connection (which we will re-open), but won't affect
+ * data correctness.
+ */
+ void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
+ assert !sentStatusCode : "already sent status code to " + sock;
+ try {
+ OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
+ statusCode.writeOutputStream(out);
+ out.flush();
+ sentStatusCode = true;
+ } catch (IOException e) {
+ // It's ok not to be able to send this. But something is probably wrong.
+ LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+ sock.getInetAddress() + ": " + e.getMessage());
+ }
+ }
+
+ // File name to print when accessing a block directory from servlets
+ public static String getFileName(final InetSocketAddress s,
+ final long blockId) {
+ return s.toString() + ":" + blockId;
+ }
+}
Propchange: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java?rev=1329468&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java (added)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java Mon Apr 23 21:37:55 2012
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A block and the full path information to the block data file and
+ * the metadata file stored on the local file system.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockLocalPathInfo implements Writable {
+ static final WritableFactory FACTORY = new WritableFactory() {
+ public Writable newInstance() { return new BlockLocalPathInfo(); }
+ };
+ static { // register a ctor
+ WritableFactories.setFactory(BlockLocalPathInfo.class, FACTORY);
+ }
+
+ private Block block;
+ private String localBlockPath = ""; // local file storing the data
+ private String localMetaPath = ""; // local file storing the checksum
+
+ public BlockLocalPathInfo() {}
+
+ /**
+ * Constructs BlockLocalPathInfo.
+ * @param b The block corresponding to this lock path info.
+ * @param file Block data file.
+ * @param metafile Metadata file for the block.
+ */
+ public BlockLocalPathInfo(Block b, String file, String metafile) {
+ block = b;
+ localBlockPath = file;
+ localMetaPath = metafile;
+ }
+
+ /**
+ * Get the Block data file.
+ * @return Block data file.
+ */
+ public String getBlockPath() {return localBlockPath;}
+
+ /**
+ * Get the Block metadata file.
+ * @return Block metadata file.
+ */
+ public String getMetaPath() {return localMetaPath;}
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ block.write(out);
+ Text.writeString(out, localBlockPath);
+ Text.writeString(out, localMetaPath);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ block = new Block();
+ block.readFields(in);
+ localBlockPath = Text.readString(in);
+ localMetaPath = Text.readString(in);
+ }
+
+ /**
+ * Get number of bytes in the block.
+ * @return Number of bytes in the block.
+ */
+ public long getNumBytes() {
+ return block.getNumBytes();
+ }
+}
Propchange: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Mon Apr 23 21:37:55 2012
@@ -26,20 +26,51 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.Token;
/** An client-datanode protocol for block recovery
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
+@KerberosInfo(
+ serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
@TokenInfo(BlockTokenSelector.class)
public interface ClientDatanodeProtocol extends VersionedProtocol {
public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
/**
- * 6: recoverBlock() removed.
+ * 7: added getBlockLocalPathInfo.
*/
- public static final long versionID = 6L;
+ public static final long versionID = 7L;
/** Return the visible length of a replica. */
long getReplicaVisibleLength(Block b) throws IOException;
+
+ /**
+ * Retrieves the path names of the block file and metadata file stored on the
+ * local file system.
+ *
+ * In order for this method to work, one of the following should be satisfied:
+ * <ul>
+ * <li>
+ * The client user must be configured at the datanode to be able to use this
+ * method.</li>
+ * <li>
+ * When security is enabled, kerberos authentication must be used to connect
+ * to the datanode.</li>
+ * </ul>
+ *
+ * @param block
+ * the specified block on the local datanode
+ * @param token
+ * the block access token.
+ * @return the BlockLocalPathInfo of a block
+ * @throws IOException
+ * on error
+ */
+ BlockLocalPathInfo getBlockLocalPathInfo(Block block,
+ Token<BlockTokenIdentifier> token) throws IOException;
}
Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Mon Apr 23 21:37:55 2012
@@ -26,8 +26,8 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;
import java.net.URLEncoder;
-import java.util.Arrays;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -43,6 +43,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -192,8 +193,8 @@ public class JspHelper {
long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
// Use the block name for file name.
- String file = BlockReader.getFileName(addr, blockId);
- BlockReader blockReader = BlockReader.newBlockReader(s, file,
+ String file = RemoteBlockReader.getFileName(addr, blockId);
+ BlockReader blockReader = RemoteBlockReader.newBlockReader(s, file,
new Block(blockId, 0, genStamp), blockToken,
offsetIntoBlock, amtToRead, conf.getInt("io.file.buffer.size", 4096));
Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Mon Apr 23 21:37:55 2012
@@ -26,6 +26,8 @@ import java.io.IOException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
/**
@@ -33,7 +35,9 @@ import org.apache.hadoop.util.DataChecks
* This is not related to the Block related functionality in Namenode.
* The biggest part of data block metadata is CRC for the block.
*/
-class BlockMetadataHeader {
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMetadataHeader {
static final short METADATA_VERSION = FSDataset.METADATA_VERSION;
@@ -50,11 +54,13 @@ class BlockMetadataHeader {
this.version = version;
}
- short getVersion() {
+ /** Get the version */
+ public short getVersion() {
return version;
}
- DataChecksum getChecksum() {
+ /** Get the version */
+ public DataChecksum getChecksum() {
return checksum;
}
@@ -65,7 +71,7 @@ class BlockMetadataHeader {
* @return Metadata Header
* @throws IOException
*/
- static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
+ public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
return readHeader(in.readShort(), in);
}