You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2011/11/23 01:03:36 UTC
svn commit: r1205243 - in /hadoop/common/branches/branch-0.20-security: ./
src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/
src/hdfs/org/apache/hadoop/hdfs/server/datanode/
src/hdfs/org/apache/hadoop/hdfs/server/datanode/metri...
Author: jitendra
Date: Wed Nov 23 00:03:30 2011
New Revision: 1205243
URL: http://svn.apache.org/viewvc?rev=1205243&view=rev
Log:
HDFS-2246. Shortcut a local client reads to a Datanodes files directly. Contributed by Andrew Purtell, Suresh and Jitendra.
Added:
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java
hadoop/common/branches/branch-0.20-security/src/test/commit-tests
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Wed Nov 23 00:03:30 2011
@@ -90,6 +90,9 @@ Release 0.20.206.0 - unreleased
MAPREDUCE-3419. Don't mark exited TT threads as dead in MiniMRCluster (eli)
+ HDFS-2246. Shortcut a local client reads to a Datanodes files directly.
+ (Andrew Purtell, Suresh, Jitendra)
+
Release 0.20.205.1 - unreleased
NEW FEATURES
Added: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1205243&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java Wed Nov 23 00:03:30 2011
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient.BlockReader;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+/**
+ * BlockReaderLocal enables local short circuited reads. If the DFS client is on
+ * the same machine as the datanode, then the client can read files directly
+ * from the local file system rather than going through the datanode for better
+ * performance. <br>
+ * {@link BlockReaderLocal} works as follows:
+ * <ul>
+ * <li>The client performing short circuit reads must be configured at the
+ * datanode.</li>
+ * <li>The client gets the path to the file where block is stored using
+ * {@link ClientDatanodeProtocol#getBlockLocalPathInfo(Block, Token)} RPC call</li>
+ * <li>Client uses kerberos authentication to connect to the datanode over RPC,
+ * if security is enabled.</li>
+ * </ul>
+ */
+class BlockReaderLocal extends BlockReader {
+ public static final Log LOG = LogFactory.getLog(DFSClient.class);
+
+ //Stores the cache and proxy for a local datanode.
+ private static class LocalDatanodeInfo {
+ private ClientDatanodeProtocol proxy = null;
+ private final Map<Block, BlockLocalPathInfo> cache;
+
+ LocalDatanodeInfo() {
+ final int cacheSize = 10000;
+ final float hashTableLoadFactor = 0.75f;
+ int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
+ cache = Collections
+ .synchronizedMap(new LinkedHashMap<Block, BlockLocalPathInfo>(
+ hashTableCapacity, hashTableLoadFactor, true) {
+ private static final long serialVersionUID = 1;
+
+ @Override
+ protected boolean removeEldestEntry(
+ Map.Entry<Block, BlockLocalPathInfo> eldest) {
+ return size() > cacheSize;
+ }
+ });
+ }
+
+ private synchronized ClientDatanodeProtocol getDatanodeProxy(
+ DatanodeInfo node, Configuration conf, int socketTimeout)
+ throws IOException {
+ if (proxy == null) {
+ proxy = DFSClient.createClientDatanodeProtocolProxy(node, conf,
+ socketTimeout);
+ }
+ return proxy;
+ }
+
+ private synchronized void resetDatanodeProxy() {
+ if (null != proxy) {
+ RPC.stopProxy(proxy);
+ proxy = null;
+ }
+ }
+
+ private BlockLocalPathInfo getBlockLocalPathInfo(Block b) {
+ return cache.get(b);
+ }
+
+ private void setBlockLocalPathInfo(Block b, BlockLocalPathInfo info) {
+ cache.put(b, info);
+ }
+
+ private void removeBlockLocalPathInfo(Block b) {
+ cache.remove(b);
+ }
+ }
+
+ // Multiple datanodes could be running on the local machine. Store proxies in
+ // a map keyed by the ipc port of the datanode.
+ private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+
+ private FileInputStream dataIn; // reader for the data file
+ private FileInputStream checksumIn; // reader for the checksum file
+
+ /**
+ * The only way this object can be instantiated.
+ */
+ static BlockReaderLocal newBlockReader(Configuration conf,
+ String file, Block blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
+ int socketTimeout, long startOffset, long length) throws IOException {
+
+ LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
+ // check the cache first
+ BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
+ if (pathinfo == null) {
+ pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
+ }
+
+ // check to see if the file exists. It may so happen that the
+ // HDFS file has been deleted and this block-lookup is occurring
+ // on behalf of a new HDFS file. This time, the block file could
+ // be residing in a different portion of the fs.data.dir directory.
+ // In this case, we remove this entry from the cache. The next
+ // call to this method will re-populate the cache.
+ FileInputStream dataIn = null;
+ FileInputStream checksumIn = null;
+ BlockReaderLocal localBlockReader = null;
+ boolean skipChecksum = shortCircuitChecksum(conf);
+ try {
+ // get a local file system
+ File blkfile = new File(pathinfo.getBlockPath());
+ dataIn = new FileInputStream(blkfile);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
+ + blkfile.length() + " startOffset " + startOffset + " length "
+ + length + " short circuit checksum " + skipChecksum);
+ }
+
+ if (!skipChecksum) {
+ // get the metadata file
+ File metafile = new File(pathinfo.getMetaPath());
+ checksumIn = new FileInputStream(metafile);
+
+ // read and handle the common header here. For now just a version
+ BlockMetadataHeader header = BlockMetadataHeader
+ .readHeader(new DataInputStream(checksumIn));
+ short version = header.getVersion();
+ if (version != FSDataset.METADATA_VERSION) {
+ LOG.warn("Wrong version (" + version + ") for metadata file for "
+ + blk + " ignoring ...");
+ }
+ DataChecksum checksum = header.getChecksum();
+ localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length,
+ pathinfo, checksum, true, dataIn, checksumIn);
+ } else {
+ localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length,
+ pathinfo, dataIn);
+ }
+ } catch (IOException e) {
+ // remove from cache
+ localDatanodeInfo.removeBlockLocalPathInfo(blk);
+ DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk +
+ " from cache because local file " + pathinfo.getBlockPath() +
+ " could not be opened.");
+ throw e;
+ } finally {
+ if (localBlockReader == null) {
+ if (dataIn != null) {
+ dataIn.close();
+ }
+ if (checksumIn != null) {
+ checksumIn.close();
+ }
+ }
+ }
+ return localBlockReader;
+ }
+
+ private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
+ LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
+ if (ldInfo == null) {
+ ldInfo = new LocalDatanodeInfo();
+ localDatanodeInfoMap.put(port, ldInfo);
+ }
+ return ldInfo;
+ }
+
+ private static BlockLocalPathInfo getBlockPathInfo(Block blk,
+ DatanodeInfo node, Configuration conf, int timeout,
+ Token<BlockTokenIdentifier> token) throws IOException {
+ LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort);
+ BlockLocalPathInfo pathinfo = null;
+ ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
+ conf, timeout);
+ try {
+ // make RPC to local datanode to find local pathnames of blocks
+ pathinfo = proxy.getBlockLocalPathInfo(blk, token);
+ if (pathinfo != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cached location of block " + blk + " as " + pathinfo);
+ }
+ localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
+ }
+ } catch (IOException e) {
+ localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
+ throw e;
+ }
+ return pathinfo;
+ }
+
+ private static boolean shortCircuitChecksum(Configuration conf) {
+ return conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+ }
+
+ private BlockReaderLocal(Configuration conf, String hdfsfile, Block block,
+ Token<BlockTokenIdentifier> token, long startOffset, long length,
+ BlockLocalPathInfo pathinfo, FileInputStream dataIn) throws IOException {
+ super(
+ new Path("/blk_" + block.getBlockId() + ":of:" + hdfsfile) /*too non path-like?*/,
+ 1);
+ this.startOffset = startOffset;
+ this.dataIn = dataIn;
+ long toSkip = startOffset;
+ while (toSkip > 0) {
+ long skipped = dataIn.skip(toSkip);
+ if (skipped == 0) {
+ throw new IOException("Couldn't initialize input stream");
+ }
+ toSkip -= skipped;
+ }
+ }
+
+ private BlockReaderLocal(Configuration conf, String hdfsfile, Block block,
+ Token<BlockTokenIdentifier> token, long startOffset, long length,
+ BlockLocalPathInfo pathinfo, DataChecksum checksum, boolean verifyChecksum,
+ FileInputStream dataIn, FileInputStream checksumIn) throws IOException {
+ super(
+ new Path("/blk_" + block.getBlockId() + ":of:" + hdfsfile) /*too non path-like?*/,
+ 1,
+ checksum,
+ verifyChecksum);
+ this.startOffset = startOffset;
+ this.dataIn = dataIn;
+ this.checksumIn = checksumIn;
+ this.checksum = checksum;
+
+ long blockLength = pathinfo.getNumBytes();
+
+ /* If bytesPerChecksum is very large, then the metadata file
+ * is mostly corrupted. For now just truncate bytesPerchecksum to
+ * blockLength.
+ */
+ bytesPerChecksum = checksum.getBytesPerChecksum();
+ if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
+ checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
+ Math.max((int) blockLength, 10 * 1024 * 1024));
+ bytesPerChecksum = checksum.getBytesPerChecksum();
+ }
+
+ checksumSize = checksum.getChecksumSize();
+
+ long endOffset = blockLength;
+ if (startOffset < 0 || startOffset > endOffset
+ || (length + startOffset) > endOffset) {
+ String msg = " Offset " + startOffset + " and length " + length
+ + " don't match block " + block + " ( blockLen " + endOffset + " )";
+ LOG.warn("BlockReaderLocal requested with incorrect offset: " + msg);
+ throw new IOException(msg);
+ }
+
+ firstChunkOffset = (startOffset - (startOffset % bytesPerChecksum));
+
+ if (length >= 0) {
+ // Make sure endOffset points to end of a checksumed chunk.
+ long tmpLen = startOffset + length;
+ if (tmpLen % bytesPerChecksum != 0) {
+ tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
+ }
+ if (tmpLen < endOffset) {
+ endOffset = tmpLen;
+ }
+ }
+
+ // seek to the right offsets
+ if (firstChunkOffset > 0) {
+ dataIn.getChannel().position(firstChunkOffset);
+
+ long checksumSkip = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+ // note blockInStream is seeked when created below
+ if (checksumSkip > 0) {
+ checksumIn.skip(checksumSkip);
+ }
+ }
+
+ lastChunkOffset = firstChunkOffset;
+ lastChunkLen = -1;
+ }
+
+ @Override
+ public synchronized int read(byte[] buf, int off, int len) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("read off " + off + " len " + len);
+ }
+ if (checksum == null) {
+ return dataIn.read(buf, off, len);
+ } else {
+ return super.read(buf, off, len);
+ }
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("skip " + n);
+ }
+ if (checksum == null) {
+ return dataIn.skip(n);
+ } else {
+ return super.skip(n);
+ }
+ }
+
+ @Override
+ public synchronized void seek(long n) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("seek " + n);
+ }
+ throw new IOException("Seek() is not supported in BlockReaderLocal");
+ }
+
+ @Override
+ protected synchronized int readChunk(long pos, byte[] buf, int offset,
+ int len, byte[] checksumBuf) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reading chunk from position " + pos + " at offset " +
+ offset + " with length " + len);
+ }
+
+ if (gotEOS) {
+ startOffset = -1;
+ return -1;
+ }
+
+ if (checksumBuf.length != checksumSize) {
+ throw new IOException("Cannot read checksum into buffer. "
+ + "The buffer must be exactly '" + checksumSize
+ + "' bytes long to hold the checksum bytes.");
+ }
+
+ if ((pos + firstChunkOffset) != lastChunkOffset) {
+ throw new IOException("Mismatch in pos : " + pos + " + "
+ + firstChunkOffset + " != " + lastChunkOffset);
+ }
+
+ int nRead = dataIn.read(buf, offset, bytesPerChecksum);
+ if (nRead < bytesPerChecksum) {
+ gotEOS = true;
+ }
+
+ lastChunkOffset += nRead;
+ lastChunkLen = nRead;
+
+ // If verifyChecksum is false, we omit reading the checksum
+ if (checksumIn != null) {
+ int nChecksumRead = checksumIn.read(checksumBuf);
+ if (nChecksumRead != checksumSize) {
+ throw new IOException("Could not read checksum at offset " +
+ checksumIn.getChannel().position() + " from the meta file.");
+ }
+ }
+
+ return nRead;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (dataIn != null) {
+ dataIn.close();
+ dataIn = null;
+ }
+ if (checksumIn != null) {
+ checksumIn.close();
+ checksumIn = null;
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Wed Nov 23 00:03:30 2011
@@ -93,6 +93,7 @@ public class DFSClient implements FSCons
final int writePacketSize;
private final FileSystem.Statistics stats;
private int maxBlockAcquireFailures;
+ private boolean shortCircuitLocalReads;
/**
* We assume we're talking to another CDH server, which supports
@@ -144,6 +145,7 @@ public class DFSClient implements FSCons
rpcNamenode, methodNameToPolicyMap);
}
+ /** Create {@link ClientDatanodeProtocol} proxy with block/token */
static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
DatanodeID datanodeid, Configuration conf,
Block block, Token<BlockTokenIdentifier> token, int socketTimeout) throws IOException {
@@ -160,6 +162,20 @@ public class DFSClient implements FSCons
.getDefaultSocketFactory(conf), socketTimeout);
}
+ /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
+ static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
+ DatanodeID datanodeid, Configuration conf, int socketTimeout)
+ throws IOException {
+ InetSocketAddress addr = NetUtils.createSocketAddr(
+ datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+ if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
+ ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
+ }
+ return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
+ ClientDatanodeProtocol.versionID, addr, conf, NetUtils
+ .getDefaultSocketFactory(conf), socketTimeout);
+ }
+
/**
* Same as this(NameNode.getAddress(conf), conf);
* @see #DFSClient(InetSocketAddress, Configuration)
@@ -206,7 +222,7 @@ public class DFSClient implements FSCons
// dfs.write.packet.size is an internal config variable
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
-
+
ugi = UserGroupInformation.getCurrentUser();
String taskId = conf.get("mapred.task.id");
@@ -229,6 +245,13 @@ public class DFSClient implements FSCons
"Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
+ "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
}
+ // read directly from the block file if configured.
+ this.shortCircuitLocalReads = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Short circuit read is " + shortCircuitLocalReads);
+ }
}
static int getMaxBlockAcquireFailures(Configuration conf) {
@@ -325,6 +348,82 @@ public class DFSClient implements FSCons
}
/**
+ * Get {@link BlockReader} for short circuited local reads.
+ */
+ private static BlockReader getLocalBlockReader(Configuration conf,
+ String src, Block blk, Token<BlockTokenIdentifier> accessToken,
+ DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock)
+ throws InvalidToken, IOException {
+ try {
+ return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
+ chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
+ - offsetIntoBlock);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(InvalidToken.class,
+ AccessControlException.class);
+ }
+ }
+
+ private static Set<String> localIpAddresses = Collections
+ .synchronizedSet(new HashSet<String>());
+
+ private static boolean isLocalAddress(InetSocketAddress targetAddr) {
+ InetAddress addr = targetAddr.getAddress();
+ if (localIpAddresses.contains(addr.getHostAddress())) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Address " + targetAddr + " is local");
+ }
+ return true;
+ }
+
+ // Check if the address is any local or loop back
+ boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
+
+ // Check if the address is defined on any interface
+ if (!local) {
+ try {
+ local = NetworkInterface.getByInetAddress(addr) != null;
+ } catch (SocketException e) {
+ local = false;
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Address " + targetAddr + " is local");
+ }
+ if (local == true) {
+ localIpAddresses.add(addr.getHostAddress());
+ }
+ return local;
+ }
+
+ /**
+ * Should the block access token be refetched on an exception
+ *
+ * @param ex Exception received
+ * @param targetAddr Target datanode address from where exception was received
+ * @return true if block access token has expired or invalid and it should be
+ * refetched
+ */
+ private static boolean tokenRefetchNeeded(IOException ex,
+ InetSocketAddress targetAddr) {
+ /*
+ * Get a new access token and retry. Retry is needed in 2 cases. 1) When
+ * both NN and DN re-started while DFSClient holding a cached access token.
+ * 2) In the case that NN fails to update its access key at pre-set interval
+ * (by a wide margin) and subsequently restarts. In this case, DN
+ * re-registers itself with NN and receives a new access key, but DN will
+ * delete the old access key from its memory since it's considered expired
+ * based on the estimated expiration date.
+ */
+ if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) {
+ LOG.info("Access token was invalid when connecting to " + targetAddr
+ + " : " + ex);
+ return true;
+ }
+ return false;
+ }
+
+ /**
* Cancel a delegation token
* @param token the token to cancel
* @throws InvalidToken
@@ -1312,16 +1411,16 @@ public class DFSClient implements FSCons
private Socket dnSock; //for now just sending checksumOk.
private DataInputStream in;
- private DataChecksum checksum;
- private long lastChunkOffset = -1;
- private long lastChunkLen = -1;
+ protected DataChecksum checksum;
+ protected long lastChunkOffset = -1;
+ protected long lastChunkLen = -1;
private long lastSeqNo = -1;
- private long startOffset;
- private long firstChunkOffset;
- private int bytesPerChecksum;
- private int checksumSize;
- private boolean gotEOS = false;
+ protected long startOffset;
+ protected long firstChunkOffset;
+ protected int bytesPerChecksum;
+ protected int checksumSize;
+ protected boolean gotEOS = false;
byte[] skipBuf = null;
ByteBuffer checksumBytes = null;
@@ -1358,7 +1457,8 @@ public class DFSClient implements FSCons
int nRead = super.read(buf, off, len);
// if gotEOS was set in the previous read and checksum is enabled :
- if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) {
+ if (dnSock != null && gotEOS && !eosBefore && nRead >= 0
+ && needChecksum()) {
//checksum is verified and there are no errors.
checksumOk(dnSock);
}
@@ -1536,14 +1636,44 @@ public class DFSClient implements FSCons
checksumSize = this.checksum.getChecksumSize();
}
+ /**
+ * Public constructor
+ */
+ BlockReader(Path file, int numRetries) {
+ super(file, numRetries);
+ }
+
+ protected BlockReader(Path file, int numRetries, DataChecksum checksum,
+ boolean verifyChecksum) {
+ super(file,
+ numRetries,
+ verifyChecksum,
+ checksum.getChecksumSize() > 0? checksum : null,
+ checksum.getBytesPerChecksum(),
+ checksum.getChecksumSize());
+ }
+
public static BlockReader newBlockReader(Socket sock, String file, long blockId, Token<BlockTokenIdentifier> accessToken,
long genStamp, long startOffset, long len, int bufferSize) throws IOException {
return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
true);
}
- /** Java Doc required */
- public static BlockReader newBlockReader( Socket sock, String file, long blockId,
+ /**
+ * Creates a new {@link BlockReader} for the given blockId.
+ * @param sock Socket to read the block.
+ * @param file File to which this block belongs.
+ * @param blockId Block id.
+ * @param accessToken Block access token.
+ * @param genStamp Generation stamp of the block.
+ * @param startOffset Start offset for the data.
+ * @param len Length to be read.
+ * @param bufferSize Buffer size to use.
+ * @param verifyChecksum Checksum verification is required or not.
+ * @return BlockReader object.
+ * @throws IOException
+ */
+ public static BlockReader newBlockReader(Socket sock, String file, long blockId,
Token<BlockTokenIdentifier> accessToken,
long genStamp,
long startOffset, long len,
@@ -1887,6 +2017,14 @@ public class DFSClient implements FSCons
return blockRange;
}
+ private boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr)
+ throws IOException {
+ if (shortCircuitLocalReads && isLocalAddress(targetAddr)) {
+ return true;
+ }
+ return false;
+ }
+
/**
* Open a DataInputStream to a DataNode so that it can be read from.
* We get block ID and the IDs of the destinations at startup, from the namenode.
@@ -1923,13 +2061,37 @@ public class DFSClient implements FSCons
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
+ // try reading the block locally. if this fails, then go via
+ // the datanode
+ Block blk = targetBlock.getBlock();
+ Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
+ if (shouldTryShortCircuitRead(targetAddr)) {
+ try {
+ blockReader = getLocalBlockReader(conf, src, blk, accessToken,
+ chosenNode, DFSClient.this.socketTimeout, offsetIntoBlock);
+ return chosenNode;
+ } catch (AccessControlException ex) {
+ LOG.warn("Short circuit access failed ", ex);
+ //Disable short circuit reads
+ shortCircuitLocalReads = false;
+ } catch (IOException ex) {
+ if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
+ /* Get a new access token and retry. */
+ refetchToken--;
+ fetchBlockAt(target);
+ continue;
+ } else {
+ LOG.info("Failed to read block " + targetBlock.getBlock()
+ + " on local machine" + StringUtils.stringifyException(ex));
+ LOG.info("Try reading via the datanode on " + targetAddr);
+ }
+ }
+ }
+
try {
s = socketFactory.createSocket();
NetUtils.connect(s, targetAddr, socketTimeout);
s.setSoTimeout(socketTimeout);
- Block blk = targetBlock.getBlock();
- Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
-
blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(),
accessToken,
blk.getGenerationStamp(),
@@ -1937,20 +2099,7 @@ public class DFSClient implements FSCons
buffersize, verifyChecksum, clientName);
return chosenNode;
} catch (IOException ex) {
- if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
- LOG.info("Will fetch a new access token and retry, "
- + "access token was invalid when connecting to " + targetAddr
- + " : " + ex);
- /*
- * Get a new access token and retry. Retry is needed in 2 cases. 1)
- * When both NN and DN re-started while DFSClient holding a cached
- * access token. 2) In the case that NN fails to update its
- * access key at pre-set interval (by a wide margin) and
- * subsequently restarts. In this case, DN re-registers itself with
- * NN and receives a new access key, but DN will delete the old
- * access key from its memory since it's considered expired based on
- * the estimated expiration date.
- */
+ if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
refetchToken--;
fetchBlockAt(target);
} else {
@@ -1965,8 +2114,7 @@ public class DFSClient implements FSCons
if (s != null) {
try {
s.close();
- } catch (IOException iex) {
- }
+ } catch (IOException iex) { }
}
s = null;
}
@@ -2154,21 +2302,31 @@ public class DFSClient implements FSCons
DatanodeInfo chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
BlockReader reader = null;
-
+
+ int len = (int) (end - start + 1);
try {
- dn = socketFactory.createSocket();
- NetUtils.connect(dn, targetAddr, socketTimeout);
- dn.setSoTimeout(socketTimeout);
Token<BlockTokenIdentifier> accessToken = block.getBlockToken();
-
- int len = (int) (end - start + 1);
-
- reader = BlockReader.newBlockReader(dn, src,
- block.getBlock().getBlockId(),
- accessToken,
- block.getBlock().getGenerationStamp(),
- start, len, buffersize,
- verifyChecksum, clientName);
+ // first try reading the block locally.
+ if (shouldTryShortCircuitRead(targetAddr)) {
+ try {
+ reader = getLocalBlockReader(conf, src, block.getBlock(),
+ accessToken, chosenNode, DFSClient.this.socketTimeout, start);
+ } catch (AccessControlException ex) {
+ LOG.warn("Short circuit access failed ", ex);
+ //Disable short circuit reads
+ shortCircuitLocalReads = false;
+ continue;
+ }
+ } else {
+ // go to the datanode
+ dn = socketFactory.createSocket();
+ NetUtils.connect(dn, targetAddr, socketTimeout);
+ dn.setSoTimeout(socketTimeout);
+ reader = BlockReader.newBlockReader(dn, src,
+ block.getBlock().getBlockId(), accessToken,
+ block.getBlock().getGenerationStamp(), start, len, buffersize,
+ verifyChecksum, clientName);
+ }
int nread = reader.readAll(buf, offset, len);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
@@ -2181,10 +2339,7 @@ public class DFSClient implements FSCons
e.getPos() + " from " + chosenNode.getName());
reportChecksumFailure(src, block.getBlock(), chosenNode);
} catch (IOException e) {
- if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
- LOG.info("Will get a new access token and retry, "
- + "access token was invalid when connecting to " + targetAddr
- + " : " + e);
+ if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
refetchToken--;
fetchBlockAt(block.getStartOffset());
continue;
@@ -3314,7 +3469,8 @@ public class DFSClient implements FSCons
} catch (IOException ie) {
- LOG.info("Exception in createBlockOutputStream " + ie);
+ LOG.info("Exception in createBlockOutputStream " + nodes[0].getName() +
+ " " + ie);
// find the datanode that matches
if (firstBadLink.length() != 0) {
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Nov 23 00:03:30 2011
@@ -197,6 +197,10 @@ public class DFSConfigKeys extends Commo
public static final int DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";
public static final int DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT = 100;
+ public static final String DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit";
+ public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
+ public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
+ public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
//Keys with no defaults
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
@@ -219,4 +223,5 @@ public class DFSConfigKeys extends Commo
public static final String DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY = "dfs.web.authentication.kerberos.principal";
public static final String DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
+ public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
}
Added: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java?rev=1205243&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java Wed Nov 23 00:03:30 2011
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+
+/**
+ * A block and the full path information to the block data file and
+ * the metadata file stored on the local file system.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockLocalPathInfo implements Writable {
+ static final WritableFactory FACTORY = new WritableFactory() {
+ public Writable newInstance() { return new BlockLocalPathInfo(); }
+ };
+ static { // register a ctor
+ WritableFactories.setFactory(BlockLocalPathInfo.class, FACTORY);
+ }
+
+ private Block block;
+ private String localBlockPath = ""; // local file storing the data
+ private String localMetaPath = ""; // local file storing the checksum
+
+ public BlockLocalPathInfo() {}
+
+ /**
+ * Constructs BlockLocalPathInfo.
+ * @param b The block corresponding to this lock path info.
+ * @param file Block data file.
+ * @param metafile Metadata file for the block.
+ */
+ public BlockLocalPathInfo(Block b, String file, String metafile) {
+ block = b;
+ localBlockPath = file;
+ localMetaPath = metafile;
+ }
+
+ /**
+ * Get the Block data file.
+ * @return Block data file.
+ */
+ public String getBlockPath() {return localBlockPath;}
+
+ /**
+ * Get the Block metadata file.
+ * @return Block metadata file.
+ */
+ public String getMetaPath() {return localMetaPath;}
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ block.write(out);
+ Text.writeString(out, localBlockPath);
+ Text.writeString(out, localMetaPath);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ block = new Block();
+ block.readFields(in);
+ localBlockPath = Text.readString(in);
+ localMetaPath = Text.readString(in);
+ }
+
+ /**
+ * Get number of bytes in the block.
+ * @return Number of bytes in the block.
+ */
+ public long getNumBytes() {
+ return block.getNumBytes();
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Wed Nov 23 00:03:30 2011
@@ -21,12 +21,18 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
/** An client-datanode protocol for block recovery
*/
+@KerberosInfo(
+ serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
@TokenInfo(BlockTokenSelector.class)
public interface ClientDatanodeProtocol extends VersionedProtocol {
public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
@@ -55,4 +61,29 @@ public interface ClientDatanodeProtocol
* @throws IOException if the block does not exist
*/
Block getBlockInfo(Block block) throws IOException;
+
+ /**
+ * Retrieves the path names of the block file and metadata file stored on the
+ * local file system.
+ *
+ * In order for this method to work, one of the following should be satisfied:
+ * <ul>
+ * <li>
+ * The client user must be configured at the datanode to be able to use this
+ * method.</li>
+ * <li>
+ * When security is enabled, kerberos authentication must be used to connect
+ * to the datanode.</li>
+ * </ul>
+ *
+ * @param block
+ * the specified block on the local datanode
+ * @param token
+ * the block access token.
+ * @return the BlockLocalPathInfo of a block
+ * @throws IOException
+ * on error
+ */
+ BlockLocalPathInfo getBlockLocalPathInfo(Block block,
+ Token<BlockTokenIdentifier> token) throws IOException;
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Wed Nov 23 00:03:30 2011
@@ -26,6 +26,8 @@ import java.io.IOException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
/**
@@ -33,7 +35,9 @@ import org.apache.hadoop.util.DataChecks
* This is not related to the Block related functionality in Namenode.
* The biggest part of data block metadata is CRC for the block.
*/
-class BlockMetadataHeader {
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMetadataHeader {
static final short METADATA_VERSION = FSDataset.METADATA_VERSION;
@@ -49,12 +53,14 @@ class BlockMetadataHeader {
this.checksum = checksum;
this.version = version;
}
-
- short getVersion() {
+
+ /** Get the version */
+ public short getVersion() {
return version;
}
- DataChecksum getChecksum() {
+ /** Get the version */
+ public DataChecksum getChecksum() {
return checksum;
}
@@ -65,7 +71,7 @@ class BlockMetadataHeader {
* @return Metadata Header
* @throws IOException
*/
- static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
+ public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
return readHeader(in.readShort(), in);
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Nov 23 00:03:30 2011
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
@@ -62,6 +64,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -71,6 +74,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -110,8 +114,10 @@ import org.apache.hadoop.metrics2.lib.De
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -225,6 +231,7 @@ public class DataNode extends Configured
boolean isBlockTokenEnabled;
BlockTokenSecretManager blockTokenSecretManager;
boolean isBlockTokenInitialized = false;
+ final String userWithLocalPathAccess;
/**
* Testing hook that allows tests to delay the sending of blockReceived RPCs
@@ -286,6 +293,8 @@ public class DataNode extends Configured
datanodeObject = this;
supportAppends = conf.getBoolean("dfs.support.append", false);
+ this.userWithLocalPathAccess = conf
+ .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
try {
startDataNode(conf, dataDirs, resources);
} catch (IOException ie) {
@@ -1751,6 +1760,89 @@ public class DataNode extends Configured
throw new IOException("Unknown protocol to " + getClass().getSimpleName()
+ ": " + protocol);
}
+
+ /** Ensure the authentication method is kerberos */
+ private void checkKerberosAuthMethod(String msg) throws IOException {
+ // User invoking the call must be same as the datanode user
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+ if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() !=
+ AuthenticationMethod.KERBEROS) {
+ throw new AccessControlException("Error in "+msg+". Only "
+ + "kerberos based authentication is allowed.");
+ }
+ }
+
+ private void checkBlockLocalPathAccess() throws IOException {
+ checkKerberosAuthMethod("getBlockLocalPathInfo()");
+ String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+ if (!currentUser.equals(this.userWithLocalPathAccess)) {
+ throw new AccessControlException(
+ "Can't continue with getBlockLocalPathInfo() "
+ + "authorization. The user " + currentUser
+ + " is not allowed to call getBlockLocalPathInfo");
+ }
+ }
+
+ @Override
+ public BlockLocalPathInfo getBlockLocalPathInfo(Block block,
+ Token<BlockTokenIdentifier> token) throws IOException {
+ checkBlockLocalPathAccess();
+ checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
+ BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
+ if (LOG.isDebugEnabled()) {
+ if (info != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("getBlockLocalPathInfo successful block=" + block
+ + " blockfile " + info.getBlockPath() + " metafile "
+ + info.getMetaPath());
+ }
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("getBlockLocalPathInfo for block=" + block
+ + " returning null");
+ }
+ }
+ }
+ myMetrics.incrBlocksGetLocalPathInfo();
+ return info;
+ }
+
+ private void checkBlockToken(Block block, Token<BlockTokenIdentifier> token,
+ AccessMode accessMode) throws IOException {
+ if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
+ BlockTokenIdentifier id = new BlockTokenIdentifier();
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ id.readFields(in);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got: " + id.toString());
+ }
+ blockTokenSecretManager.checkAccess(id, null, block, accessMode);
+ }
+ }
+
+ /** Check block access token for the given access mode */
+ private void checkBlockToken(Block block,
+ BlockTokenSecretManager.AccessMode accessMode) throws IOException {
+ if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
+ Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
+ .getTokenIdentifiers();
+ if (tokenIds.size() != 1) {
+ throw new IOException("Can't continue with "
+ + "authorization since " + tokenIds.size()
+ + " BlockTokenIdentifier " + "is found.");
+ }
+ for (TokenIdentifier tokenId : tokenIds) {
+ BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got: " + id.toString());
+ }
+ blockTokenSecretManager.checkAccess(id, null, block, accessMode);
+ }
+ }
+ }
/** A convenient class used in lease recovery */
private static class BlockRecord {
@@ -1952,28 +2044,13 @@ public class DataNode extends Configured
public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
) throws IOException {
logRecoverBlock("Client", block, targets);
- if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
- Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
- .getTokenIdentifiers();
- if (tokenIds.size() != 1) {
- throw new IOException("Can't continue with recoverBlock() "
- + "authorization since " + tokenIds.size() + " BlockTokenIdentifier "
- + "is found.");
- }
- for (TokenIdentifier tokenId : tokenIds) {
- BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got: " + id.toString());
- }
- blockTokenSecretManager.checkAccess(id, null, block,
- BlockTokenSecretManager.AccessMode.WRITE);
- }
- }
+ checkBlockToken(block, BlockTokenSecretManager.AccessMode.WRITE);
return recoverBlock(block, keepLength, targets, false);
}
/** {@inheritDoc} */
public Block getBlockInfo(Block block) throws IOException {
+ checkBlockToken(block, BlockTokenSecretManager.AccessMode.READ);
Block stored = data.getStoredBlock(block.getBlockId());
return stored;
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Nov 23 00:03:30 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
@@ -1072,6 +1073,16 @@ public class FSDataset implements FSCons
return f;
}
+ @Override //FSDatasetInterface
+ public BlockLocalPathInfo getBlockLocalPathInfo(Block block)
+ throws IOException {
+ File datafile = getBlockFile(block);
+ File metafile = getMetaFile(datafile, block);
+ BlockLocalPathInfo info = new BlockLocalPathInfo(block,
+ datafile.getAbsolutePath(), metafile.getAbsolutePath());
+ return info;
+ }
+
public synchronized InputStream getBlockInputStream(Block b) throws IOException {
return new FileInputStream(getBlockFile(b));
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Wed Nov 23 00:03:30 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.da
import java.io.Closeable;
+import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -30,6 +31,7 @@ import java.io.OutputStream;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -333,4 +335,8 @@ public interface FSDatasetInterface exte
public BlockRecoveryInfo startBlockRecovery(long blockId) throws IOException;
+ /**
+ * Get {@link BlockLocalPathInfo} for the given block.
+ **/
+ public BlockLocalPathInfo getBlockLocalPathInfo(Block b) throws IOException;
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java Wed Nov 23 00:03:30 2011
@@ -49,6 +49,8 @@ public class DataNodeInstrumentation imp
registry.newCounter("blocks_verified", "", 0);
final MetricMutableCounterInt blockVerificationFailures =
registry.newCounter("block_verification_failures", "", 0);
+ final MetricMutableCounterInt blocksGetLocalPathInfo =
+ registry.newCounter("blocks_get_local_pathinfo", "", 0);
final MetricMutableCounterInt readsFromLocalClient =
registry.newCounter("reads_from_local_client", "", 0);
@@ -131,6 +133,11 @@ public class DataNodeInstrumentation imp
}
//@Override
+ public void incrBlocksGetLocalPathInfo() {
+ blocksGetLocalPathInfo.incr();
+ }
+
+ //@Override
public void addReadBlockOp(long latency) {
readBlockOp.add(latency);
}
Modified: hadoop/common/branches/branch-0.20-security/src/test/commit-tests
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/commit-tests?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/commit-tests (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/commit-tests Wed Nov 23 00:03:30 2011
@@ -110,6 +110,7 @@
**/TestFileAppend.java
**/TestFileCorruption.java
**/TestFileLimit.java
+**/TestShortCircuitLocalRead.java
**/TestFileStatus.java
**/TestFSInputChecker.java
**/TestFSOutputSummer.java
Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1205243&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Wed Nov 23 00:03:30 2011
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Test for short circuit read functionality using {@link BlockReaderLocal}.
+ * When a block is being read by a client is on the local datanode, instead of
+ * using {@link DataTransferProtocol} and connect to datanode, the short circuit
+ * read allows reading the file directly from the files on the local file
+ * system.
+ */
+public class TestShortCircuitLocalRead {
+ static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
+
+ static final long seed = 0xDEADBEEFL;
+ static final int blockSize = 5120;
+ boolean simulatedStorage = false;
+
+ // creates a file but does not close it
+ static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+ throws IOException {
+ FSDataOutputStream stm = fileSys.create(name, true,
+ fileSys.getConf().getInt("io.file.buffer.size", 4096),
+ (short)repl, (long)blockSize);
+ return stm;
+ }
+
+ static private void checkData(byte[] actual, int from, byte[] expected,
+ String message) {
+ checkData(actual, from, expected, actual.length, message);
+ }
+
+ static private void checkData(byte[] actual, int from, byte[] expected,
+ int len, String message) {
+ for (int idx = 0; idx < len; idx++) {
+ if (expected[from + idx] != actual[idx]) {
+ Assert.fail(message + " byte " + (from + idx) + " differs. expected "
+ + expected[from + idx] + " actual " + actual[idx]);
+ }
+ }
+ }
+
+ static void checkFileContent(FileSystem fs, Path name, byte[] expected,
+ int readOffset) throws IOException {
+ FSDataInputStream stm = fs.open(name);
+ byte[] actual = new byte[expected.length-readOffset];
+ stm.readFully(readOffset, actual);
+ checkData(actual, readOffset, expected, "Read 2");
+ stm.close();
+ // Now read using a different API.
+ actual = new byte[expected.length-readOffset];
+ stm = fs.open(name);
+ long skipped = stm.skip(readOffset);
+ Assert.assertEquals(skipped, readOffset);
+ //Read a small number of bytes first.
+ int nread = stm.read(actual, 0, 3);
+ nread += stm.read(actual, nread, 2);
+ //Read across chunk boundary
+ nread += stm.read(actual, nread, 517);
+ checkData(actual, readOffset, expected, nread, "A few bytes");
+ //Now read rest of it
+ while (nread < actual.length) {
+ int nbytes = stm.read(actual, nread, actual.length - nread);
+ if (nbytes < 0) {
+ throw new EOFException("End of file reached before reading fully.");
+ }
+ nread += nbytes;
+ }
+ checkData(actual, readOffset, expected, "Read 3");
+ stm.close();
+ }
+
+ /**
+ * Test that file data can be read by reading the block file
+ * directly from the local store.
+ */
+ public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
+ int readOffset) throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+ ignoreChecksum);
+ conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ if (simulatedStorage) {
+ conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+ }
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fs = cluster.getFileSystem();
+ try {
+ // check that / exists
+ Path path = new Path("/");
+ assertTrue("/ should be a directory",
+ fs.getFileStatus(path).isDir() == true);
+
+ byte[] fileData = AppendTestUtil.randomBytes(seed, size);
+ // create a new file in home directory. Do not close it.
+ Path file1 = new Path("filelocal.dat");
+ FSDataOutputStream stm = createFile(fs, file1, 1);
+
+ // write to file
+ stm.write(fileData);
+ stm.close();
+ checkFileContent(fs, file1, fileData, readOffset);
+ } finally {
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testFileLocalReadNoChecksum() throws IOException {
+ doTestShortCircuitRead(true, 3*blockSize+100, 0);
+ }
+
+ @Test
+ public void testFileLocalReadChecksum() throws IOException {
+ doTestShortCircuitRead(false, 3*blockSize+100, 0);
+ }
+
+ @Test
+ public void testSmallFileLocalRead() throws IOException {
+ doTestShortCircuitRead(false, 13, 0);
+ doTestShortCircuitRead(false, 13, 5);
+ doTestShortCircuitRead(true, 13, 0);
+ doTestShortCircuitRead(true, 13, 5);
+ }
+
+ @Test
+ public void testReadFromAnOffset() throws IOException {
+ doTestShortCircuitRead(false, 3*blockSize+100, 777);
+ doTestShortCircuitRead(true, 3*blockSize+100, 777);
+ }
+
+ @Test
+ public void testLongFile() throws IOException {
+ doTestShortCircuitRead(false, 10*blockSize+100, 777);
+ doTestShortCircuitRead(true, 10*blockSize+100, 777);
+ }
+
+ @Test
+ public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
+ final Configuration conf = new Configuration();
+ conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, "alloweduser");
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+ cluster.waitActive();
+ final DataNode dn = cluster.getDataNodes().get(0);
+ FileSystem fs = cluster.getFileSystem();
+ try {
+ DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
+ UserGroupInformation aUgi = UserGroupInformation
+ .createRemoteUser("alloweduser");
+ LocatedBlocks lb = cluster.getNameNode().getBlockLocations("/tmp/x", 0,
+ 16);
+ // Create a new block object, because the block inside LocatedBlock at
+ // namenode is of type BlockInfo.
+ Block blk = new Block(lb.get(0).getBlock());
+ Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
+ final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
+ ClientDatanodeProtocol proxy = aUgi
+ .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+ @Override
+ public ClientDatanodeProtocol run() throws Exception {
+ return DFSClient.createClientDatanodeProtocolProxy(
+ dnInfo, conf, 60000);
+ }
+ });
+
+ //This should succeed
+ BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
+ Assert.assertEquals(dn.data.getBlockLocalPathInfo(blk).getBlockPath(),
+ blpi.getBlockPath());
+ RPC.stopProxy(proxy);
+
+ // Now try with a not allowed user.
+ UserGroupInformation bUgi = UserGroupInformation
+ .createRemoteUser("notalloweduser");
+ proxy = bUgi
+ .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+ @Override
+ public ClientDatanodeProtocol run() throws Exception {
+ return DFSClient.createClientDatanodeProtocolProxy(
+ dnInfo, conf, 60000);
+ }
+ });
+ try {
+ proxy.getBlockLocalPathInfo(blk, token);
+ Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+ + " is not allowed to call getBlockLocalPathInfo");
+ } catch (IOException ex) {
+ Assert.assertTrue(ex.getMessage().contains(
+ "not allowed to call getBlockLocalPathInfo"));
+ } finally {
+ RPC.stopProxy(proxy);
+ }
+ } finally {
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test to run benchmarks between shortcircuit read vs regular read with
+ * specified number of threads simultaneously reading.
+ * <br>
+ * Run this using the following command:
+ * bin/hadoop --config confdir \
+ * org.apache.hadoop.hdfs.TestShortCircuitLocalRead \
+ * <shortcircuit on?> <checsum on?> <Number of threads>
+ */
+ public static void main(String[] args) throws Exception {
+ ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.INFO);
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.INFO);
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.INFO);
+
+ if (args.length != 3) {
+ System.out.println("Usage: test shortcircuit checksum threadCount");
+ System.exit(1);
+ }
+ boolean shortcircuit = Boolean.valueOf(args[0]);
+ boolean checksum = Boolean.valueOf(args[1]);
+ int threadCount = Integer.valueOf(args[2]);
+
+ // Setup create a file
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+ checksum);
+
+ //Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
+ int fileSize = 1000 * blockSize + 100; // File with 1000 blocks
+ final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize);
+
+ // create a new file in home directory. Do not close it.
+ final Path file1 = new Path("filelocal.dat");
+ final FileSystem fs = FileSystem.get(conf);
+ FSDataOutputStream stm = createFile(fs, file1, 1);
+
+ stm.write(dataToWrite);
+ stm.close();
+
+ long start = System.currentTimeMillis();
+ final int iteration = 20;
+ Thread[] threads = new Thread[threadCount];
+ for (int i = 0; i < threadCount; i++) {
+ threads[i] = new Thread() {
+ public void run() {
+ for (int i = 0; i < iteration; i++) {
+ try {
+ checkFileContent(fs, file1, dataToWrite, 0);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+ }
+ for (int i = 0; i < threadCount; i++) {
+ threads[i].start();
+ }
+ for (int i = 0; i < threadCount; i++) {
+ threads[i].join();
+ }
+ long end = System.currentTimeMillis();
+ System.out.println("Iteration " + iteration + " took " + (end - start));
+ fs.delete(file1, false);
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1205243&r1=1205242&r2=1205243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Wed Nov 23 00:03:30 2011
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -31,6 +32,7 @@ import javax.management.StandardMBean;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo;
@@ -710,4 +712,9 @@ public class SimulatedFSDataset impleme
Block stored = getStoredBlock(blockId);
return new BlockRecoveryInfo(stored, false);
}
+
+ @Override
+ public BlockLocalPathInfo getBlockLocalPathInfo(Block blk) throws IOException {
+ throw new IOException("getBlockLocalPathInfo not supported.");
+ }
}