You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2013/01/12 00:52:24 UTC
svn commit: r1432335 [2/2] - in
/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs: ./
dev-support/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/net/
src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer...
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jan 11 23:52:22 2013
@@ -53,16 +53,15 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
-import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
-import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction;
import java.util.AbstractList;
@@ -90,6 +89,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.net.DomainPeerServer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -149,11 +149,11 @@ import org.apache.hadoop.io.ReadaheadPoo
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@@ -233,6 +233,7 @@ public class DataNode extends Configured
LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
private static final String USAGE = "Usage: java DataNode [-rollback | -regular]";
+ static final int CURRENT_BLOCK_FORMAT_VERSION = 1;
/**
* Use {@link NetUtils#createSocketAddr(String)} instead.
@@ -250,6 +251,7 @@ public class DataNode extends Configured
public final static String EMPTY_DEL_HINT = "";
AtomicInteger xmitsInProgress = new AtomicInteger();
Daemon dataXceiverServer = null;
+ Daemon localDataXceiverServer = null;
ThreadGroup threadGroup = null;
private DNConf dnConf;
private volatile boolean heartbeatsDisabledForTests = false;
@@ -261,6 +263,7 @@ public class DataNode extends Configured
private String hostName;
private DatanodeID id;
+ final private String fileDescriptorPassingDisabledReason;
boolean isBlockTokenEnabled;
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
private boolean hasAnyBlockPoolRegistered = false;
@@ -309,6 +312,24 @@ public class DataNode extends Configured
this.getHdfsBlockLocationsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+
+ // Determine whether we should try to pass file descriptors to clients.
+ if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
+ String reason = DomainSocket.getLoadingFailureReason();
+ if (reason != null) {
+ LOG.warn("File descriptor passing is disabled because " + reason);
+ this.fileDescriptorPassingDisabledReason = reason;
+ } else {
+ LOG.info("File descriptor passing is enabled.");
+ this.fileDescriptorPassingDisabledReason = null;
+ }
+ } else {
+ this.fileDescriptorPassingDisabledReason =
+ "File descriptor passing was not configured.";
+ LOG.debug(this.fileDescriptorPassingDisabledReason);
+ }
+
try {
hostName = getHostName(conf);
LOG.info("Configured hostname is " + hostName);
@@ -537,6 +558,41 @@ public class DataNode extends Configured
this.dataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(tcpPeerServer, conf, this));
this.threadGroup.setDaemon(true); // auto destroy when empty
+
+ DomainPeerServer domainPeerServer =
+ getDomainPeerServer(conf, streamingAddr.getPort());
+ if (domainPeerServer != null) {
+ this.localDataXceiverServer = new Daemon(threadGroup,
+ new DataXceiverServer(domainPeerServer, conf, this));
+ LOG.info("Listening on UNIX domain socket: " +
+ domainPeerServer.getBindPath());
+ }
+ }
+
+ static DomainPeerServer getDomainPeerServer(Configuration conf,
+ int port) throws IOException {
+ String domainSocketPath =
+ conf.get(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+ if (domainSocketPath == null) {
+ if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
+ LOG.warn("Although short-circuit local reads are configured, " +
+ "they are disabled because you didn't configure " +
+ DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+ }
+ return null;
+ }
+ if (DomainSocket.getLoadingFailureReason() != null) {
+ throw new RuntimeException("Although a UNIX domain socket " +
+ "path is configured as " + domainSocketPath + ", we cannot " +
+ "start a localDataXceiverServer because " +
+ DomainSocket.getLoadingFailureReason());
+ }
+ DomainPeerServer domainPeerServer =
+ new DomainPeerServer(domainSocketPath, port);
+ domainPeerServer.setReceiveBufferSize(
+ HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
+ return domainPeerServer;
}
// calls specific to BP
@@ -1039,6 +1095,42 @@ public class DataNode extends Configured
return info;
}
+ @InterfaceAudience.LimitedPrivate("HDFS")
+ static public class ShortCircuitFdsUnsupportedException extends IOException {
+ private static final long serialVersionUID = 1L;
+ public ShortCircuitFdsUnsupportedException(String msg) {
+ super(msg);
+ }
+ }
+
+ @InterfaceAudience.LimitedPrivate("HDFS")
+ static public class ShortCircuitFdsVersionException extends IOException {
+ private static final long serialVersionUID = 1L;
+ public ShortCircuitFdsVersionException(String msg) {
+ super(msg);
+ }
+ }
+
+ FileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> token, int maxVersion)
+ throws ShortCircuitFdsUnsupportedException,
+ ShortCircuitFdsVersionException, IOException {
+ if (fileDescriptorPassingDisabledReason != null) {
+ throw new ShortCircuitFdsUnsupportedException(
+ fileDescriptorPassingDisabledReason);
+ }
+ checkBlockToken(blk, token, BlockTokenSecretManager.AccessMode.READ);
+ int blkVersion = CURRENT_BLOCK_FORMAT_VERSION;
+ if (maxVersion < blkVersion) {
+ throw new ShortCircuitFdsVersionException("Your client is too old " +
+ "to read this block! Its format version is " +
+ blkVersion + ", but the highest format version you can read is " +
+ maxVersion);
+ }
+ metrics.incrBlocksGetLocalPathInfo();
+ return data.getShortCircuitFdsForRead(blk);
+ }
+
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
List<Token<BlockTokenIdentifier>> tokens) throws IOException,
@@ -1113,32 +1205,45 @@ public class DataNode extends Configured
if (dataXceiverServer != null) {
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
this.dataXceiverServer.interrupt();
-
- // wait for all data receiver threads to exit
- if (this.threadGroup != null) {
- int sleepMs = 2;
- while (true) {
- this.threadGroup.interrupt();
- LOG.info("Waiting for threadgroup to exit, active threads is " +
- this.threadGroup.activeCount());
- if (this.threadGroup.activeCount() == 0) {
- break;
- }
- try {
- Thread.sleep(sleepMs);
- } catch (InterruptedException e) {}
- sleepMs = sleepMs * 3 / 2; // exponential backoff
- if (sleepMs > 1000) {
- sleepMs = 1000;
- }
+ }
+ if (localDataXceiverServer != null) {
+ ((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill();
+ this.localDataXceiverServer.interrupt();
+ }
+ // wait for all data receiver threads to exit
+ if (this.threadGroup != null) {
+ int sleepMs = 2;
+ while (true) {
+ this.threadGroup.interrupt();
+ LOG.info("Waiting for threadgroup to exit, active threads is " +
+ this.threadGroup.activeCount());
+ if (this.threadGroup.activeCount() == 0) {
+ break;
+ }
+ try {
+ Thread.sleep(sleepMs);
+ } catch (InterruptedException e) {}
+ sleepMs = sleepMs * 3 / 2; // exponential backoff
+ if (sleepMs > 1000) {
+ sleepMs = 1000;
}
}
- // wait for dataXceiveServer to terminate
+ this.threadGroup = null;
+ }
+ if (this.dataXceiverServer != null) {
+ // wait for dataXceiverServer to terminate
try {
this.dataXceiverServer.join();
} catch (InterruptedException ie) {
}
}
+ if (this.localDataXceiverServer != null) {
+ // wait for localDataXceiverServer to terminate
+ try {
+ this.localDataXceiverServer.join();
+ } catch (InterruptedException ie) {
+ }
+ }
if(blockPoolManager != null) {
try {
@@ -1523,6 +1628,9 @@ public class DataNode extends Configured
// start dataXceiveServer
dataXceiverServer.start();
+ if (localDataXceiverServer != null) {
+ localDataXceiverServer.start();
+ }
ipcServer.start();
startPlugins(conf);
}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Jan 11 23:52:22 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
import static org.apache.hadoop.util.Time.now;
@@ -28,6 +29,8 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
@@ -60,11 +63,14 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -233,6 +239,68 @@ class DataXceiver extends Receiver imple
}
@Override
+ public void requestShortCircuitFds(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> token,
+ int maxVersion) throws IOException {
+ updateCurrentThreadName("Passing file descriptors for block " + blk);
+ BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
+ FileInputStream fis[] = null;
+ try {
+ if (peer.getDomainSocket() == null) {
+ throw new IOException("You cannot pass file descriptors over " +
+ "anything but a UNIX domain socket.");
+ }
+ fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
+ bld.setStatus(SUCCESS);
+ bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
+ } catch (ShortCircuitFdsVersionException e) {
+ bld.setStatus(ERROR_UNSUPPORTED);
+ bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
+ bld.setMessage(e.getMessage());
+ } catch (ShortCircuitFdsUnsupportedException e) {
+ bld.setStatus(ERROR_UNSUPPORTED);
+ bld.setMessage(e.getMessage());
+ } catch (InvalidToken e) {
+ bld.setStatus(ERROR_ACCESS_TOKEN);
+ bld.setMessage(e.getMessage());
+ } catch (IOException e) {
+ bld.setStatus(ERROR);
+ bld.setMessage(e.getMessage());
+ }
+ try {
+ bld.build().writeDelimitedTo(socketOut);
+ if (fis != null) {
+ FileDescriptor fds[] = new FileDescriptor[fis.length];
+ for (int i = 0; i < fds.length; i++) {
+ fds[i] = fis[i].getFD();
+ }
+ byte buf[] = new byte[] { (byte)0 };
+ peer.getDomainSocket().
+ sendFileDescriptors(fds, buf, 0, buf.length);
+ }
+ } finally {
+ if (ClientTraceLog.isInfoEnabled()) {
+ DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
+ .getBlockPoolId());
+ BlockSender.ClientTraceLog.info(String.format(
+ String.format(
+ "src: %s, dest: %s, op: %s, blockid: %s, srvID: %s, " +
+ "success: %b",
+ "127.0.0.1", // src IP
+ "127.0.0.1", // dst IP
+ "REQUEST_SHORT_CIRCUIT_FDS", // operation
+ blk.getBlockId(), // block id
+ dnR.getStorageID(),
+ (fis != null)
+ )));
+ }
+ if (fis != null) {
+ IOUtils.cleanup(LOG, fis);
+ }
+ }
+ }
+
+ @Override
public void readBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Fri Jan 11 23:52:22 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.da
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
@@ -386,4 +387,6 @@ public interface FsDatasetSpi<V extends
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
throws IOException;
+ FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+ throws IOException;
}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Fri Jan 11 23:52:22 2013
@@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -1669,6 +1670,26 @@ class FsDatasetImpl implements FsDataset
}
@Override // FsDatasetSpi
+ public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+ throws IOException {
+ File datafile = getBlockFile(block);
+ File metafile = FsDatasetUtil.getMetaFile(datafile,
+ block.getGenerationStamp());
+ FileInputStream fis[] = new FileInputStream[2];
+ boolean success = false;
+ try {
+ fis[0] = new FileInputStream(datafile);
+ fis[1] = new FileInputStream(metafile);
+ success = true;
+ return fis;
+ } finally {
+ if (!success) {
+ IOUtils.cleanup(null, fis);
+ }
+ }
+ }
+
+ @Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
throws IOException {
// List of VolumeIds, one per volume on the datanode
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Fri Jan 11 23:52:22 2013
@@ -563,7 +563,7 @@ public class NamenodeFsck {
conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
getDataEncryptionKey()),
- chosenNode);
+ chosenNode, null, false);
} catch (IOException ex) {
// Put chosen node into dead list, continue
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto Fri Jan 11 23:52:22 2013
@@ -74,6 +74,8 @@ message DeleteBlockPoolResponseProto {
* Gets the file information where block and its metadata is stored
* block - block for which path information is being requested
* token - block token
+ *
+ * This message is deprecated in favor of file descriptor passing.
*/
message GetBlockLocalPathInfoRequestProto {
required ExtendedBlockProto block = 1;
@@ -84,6 +86,8 @@ message GetBlockLocalPathInfoRequestProt
* block - block for which file path information is being returned
* localPath - file path where the block data is stored
* localMetaPath - file path where the block meta data is stored
+ *
+ * This message is deprecated in favor of file descriptor passing.
*/
message GetBlockLocalPathInfoResponseProto {
required ExtendedBlockProto block = 1;
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Fri Jan 11 23:52:22 2013
@@ -114,6 +114,16 @@ message OpBlockChecksumProto {
required BaseHeaderProto header = 1;
}
+message OpRequestShortCircuitAccessProto {
+ required BaseHeaderProto header = 1;
+
+ /** In order to get short-circuit access to block data, clients must set this
+ * to the highest version of the block data that they can understand.
+ * Currently 1 is the only version, but more versions may exist in the future
+ * if the on-disk format changes.
+ */
+ required uint32 maxVersion = 2;
+}
message PacketHeaderProto {
// All fields must be fixed-length!
@@ -132,6 +142,7 @@ enum Status {
ERROR_EXISTS = 4;
ERROR_ACCESS_TOKEN = 5;
CHECKSUM_OK = 6;
+ ERROR_UNSUPPORTED = 7;
}
message PipelineAckProto {
@@ -164,6 +175,16 @@ message BlockOpResponseProto {
/** explanatory text which may be useful to log on the client side */
optional string message = 5;
+
+ /** If the server chooses to agree to the request of a client for
+ * short-circuit access, it will send a response message with the relevant
+ * file descriptors attached.
+ *
+ * In the body of the message, this version number will be set to the
+ * specific version number of the block data that the client is about to
+ * read.
+ */
+ optional uint32 shortCircuitAccessVersion = 6;
}
/**
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Fri Jan 11 23:52:22 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
/**
* A helper class to setup the cluster, and get to BlockReader and DataNode for a block.
@@ -156,7 +157,7 @@ public class BlockReaderTestUtil {
testBlock.getBlockToken(),
offset, lenToRead,
true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
- nodes[0]);
+ nodes[0], null, false);
}
/**
@@ -168,4 +169,12 @@ public class BlockReaderTestUtil {
return cluster.getDataNode(ipcport);
}
+ public boolean haveRequiredResources() {
+ if (conf.get(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY) != null) {
+ // To use UNIX Domain sockets, we must have the native code loaded.
+ return DomainSocket.getLoadingFailureReason() == null;
+ } else {
+ return true;
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Jan 11 23:52:22 2013
@@ -2189,8 +2189,8 @@ public class MiniDFSCluster {
/**
* Get file correpsonding to a block
* @param storageDir storage directory
- * @param blk block to be corrupted
- * @return file corresponding to the block
+ * @param blk the block
+ * @return data file corresponding to the block
*/
public static File getBlockFile(File storageDir, ExtendedBlock blk) {
return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()),
@@ -2198,6 +2198,19 @@ public class MiniDFSCluster {
}
/**
+ * Get the latest metadata file correpsonding to a block
+ * @param storageDir storage directory
+ * @param blk the block
+ * @return metadata file corresponding to the block
+ */
+ public static File getBlockMetadataFile(File storageDir, ExtendedBlock blk) {
+ return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()),
+ blk.getBlockName() + "_" + blk.getGenerationStamp() +
+ Block.METADATA_EXTENSION);
+
+ }
+
+ /**
* Shut down a cluster if it is not null
* @param cluster cluster reference or null
*/
@@ -2224,7 +2237,7 @@ public class MiniDFSCluster {
}
/**
- * Get files related to a block for a given datanode
+ * Get the block data file for a block from a given datanode
* @param dnIndex Index of the datanode to get block files for
* @param block block for which corresponding files are needed
*/
@@ -2239,6 +2252,24 @@ public class MiniDFSCluster {
}
return null;
}
+
+ /**
+ * Get the block metadata file for a block from a given datanode
+ *
+ * @param dnIndex Index of the datanode to get block files for
+ * @param block block for which corresponding files are needed
+ */
+ public static File getBlockMetadataFile(int dnIndex, ExtendedBlock block) {
+ // Check for block file in the two storage directories of the datanode
+ for (int i = 0; i <=1 ; i++) {
+ File storageDir = MiniDFSCluster.getStorageDir(dnIndex, i);
+ File blockMetaFile = getBlockMetadataFile(storageDir, block);
+ if (blockMetaFile.exists()) {
+ return blockMetaFile;
+ }
+ }
+ return null;
+ }
/**
* Throw an exception if the MiniDFSCluster is not started with a single
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Fri Jan 11 23:52:22 2013
@@ -17,90 +17,333 @@
*/
package org.apache.hadoop.hdfs;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
import org.junit.Test;
public class TestBlockReaderLocal {
- static MiniDFSCluster cluster;
- static HdfsConfiguration conf;
+ public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
+ int off2, int len) {
+ for (int i = 0; i < len; i++) {
+ if (buf1[off1 + i] != buf2[off2 + i]) {
+ Assert.fail("arrays differ at byte " + i + ". " +
+ "The first array has " + (int)buf1[off1 + i] +
+ ", but the second array has " + (int)buf2[off2 + i]);
+ }
+ }
+ }
- @BeforeClass
- public static void setupCluster() throws IOException {
- conf = new HdfsConfiguration();
+ /**
+ * Similar to IOUtils#readFully(). Reads bytes in a loop.
+ *
+ * @param reader The BlockReaderLocal to read bytes from
+ * @param buf The ByteBuffer to read into
+ * @param off The offset in the buffer to read into
+ * @param len The number of bytes to read.
+ *
+ * @throws IOException If it could not read the requested number of bytes
+ */
+ private static void readFully(BlockReaderLocal reader,
+ ByteBuffer buf, int off, int len) throws IOException {
+ int amt = len;
+ while (amt > 0) {
+ buf.limit(off + len);
+ buf.position(off);
+ long ret = reader.read(buf);
+ if (ret < 0) {
+ throw new EOFException( "Premature EOF from BlockReaderLocal " +
+ "after reading " + (len - amt) + " byte(s).");
+ }
+ amt -= ret;
+ off += ret;
+ }
+ }
+
+ private static interface BlockReaderLocalTest {
+ final int TEST_LENGTH = 12345;
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException;
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException;
+ }
+
+ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
+ boolean checksum) throws IOException {
+ MiniDFSCluster cluster = null;
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum);
+ conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+ FileInputStream dataIn = null, checkIn = null;
+ final Path TEST_PATH = new Path("/a");
+ final long RANDOM_SEED = 4567L;
+ BlockReaderLocal blockReaderLocal = null;
+ FSDataInputStream fsIn = null;
+ byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ IOUtils.readFully(fsIn, original, 0,
+ BlockReaderLocalTest.TEST_LENGTH);
+ fsIn.close();
+ fsIn = null;
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+ File dataFile = MiniDFSCluster.getBlockFile(0, block);
+ File metaFile = MiniDFSCluster.getBlockMetadataFile(0, block);
+
+ DatanodeID datanodeID = cluster.getDataNodes().get(0).getDatanodeId();
+ cluster.shutdown();
+ cluster = null;
+ test.setup(dataFile, checksum);
+ dataIn = new FileInputStream(dataFile);
+ checkIn = new FileInputStream(metaFile);
+ blockReaderLocal = new BlockReaderLocal(conf,
+ TEST_PATH.getName(), block, 0, -1,
+ dataIn, checkIn, datanodeID, checksum);
+ dataIn = null;
+ checkIn = null;
+ test.doTest(blockReaderLocal, original);
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (cluster != null) cluster.shutdown();
+ if (dataIn != null) dataIn.close();
+ if (checkIn != null) checkIn.close();
+ if (blockReaderLocal != null) blockReaderLocal.close(null, null);
+ }
+ }
+
+ private static class TestBlockReaderLocalImmediateClose
+ implements BlockReaderLocalTest {
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException { }
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException { }
+ }
+
+ @Test
+ public void testBlockReaderLocalImmediateClose() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true);
+ runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false);
+ }
+
+ private static class TestBlockReaderSimpleReads
+ implements BlockReaderLocalTest {
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException { }
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ byte buf[] = new byte[TEST_LENGTH];
+ reader.readFully(buf, 0, 512);
+ assertArrayRegionsEqual(original, 0, buf, 0, 512);
+ reader.readFully(buf, 512, 512);
+ assertArrayRegionsEqual(original, 512, buf, 512, 512);
+ reader.readFully(buf, 1024, 513);
+ assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
+ reader.readFully(buf, 1537, 514);
+ assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
+ }
+ }
+
+ @Test
+ public void testBlockReaderSimpleReads() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true);
+ }
+
+ @Test
+ public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false);
+ }
+
+ private static class TestBlockReaderLocalArrayReads2
+ implements BlockReaderLocalTest {
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException { }
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ byte buf[] = new byte[TEST_LENGTH];
+ reader.readFully(buf, 0, 10);
+ assertArrayRegionsEqual(original, 0, buf, 0, 10);
+ reader.readFully(buf, 10, 100);
+ assertArrayRegionsEqual(original, 10, buf, 10, 100);
+ reader.readFully(buf, 110, 700);
+ assertArrayRegionsEqual(original, 110, buf, 110, 700);
+ reader.readFully(buf, 810, 1); // from offset 810 to offset 811
+ reader.readFully(buf, 811, 5);
+ assertArrayRegionsEqual(original, 811, buf, 811, 5);
+ reader.readFully(buf, 816, 900); // skip from offset 816 to offset 1716
+ reader.readFully(buf, 1716, 5);
+ assertArrayRegionsEqual(original, 1716, buf, 1716, 5);
+ }
+ }
+
+ @Test
+ public void testBlockReaderLocalArrayReads2() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
+ true);
+ }
- conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
- conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+ @Test
+ public void testBlockReaderLocalArrayReads2NoChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
false);
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- UserGroupInformation.getCurrentUser().getShortUserName());
+ }
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ private static class TestBlockReaderLocalByteBufferReads
+ implements BlockReaderLocalTest {
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException { }
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+ readFully(reader, buf, 0, 10);
+ assertArrayRegionsEqual(original, 0, buf.array(), 0, 10);
+ readFully(reader, buf, 10, 100);
+ assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+ readFully(reader, buf, 110, 700);
+ assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+ reader.skip(1); // skip from offset 810 to offset 811
+ readFully(reader, buf, 811, 5);
+ assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+ }
+ }
+
+ @Test
+ public void testBlockReaderLocalByteBufferReads()
+ throws IOException {
+ runBlockReaderLocalTest(
+ new TestBlockReaderLocalByteBufferReads(), true);
}
- @AfterClass
- public static void teardownCluster() {
- cluster.shutdown();
+ @Test
+ public void testBlockReaderLocalByteBufferReadsNoChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(
+ new TestBlockReaderLocalByteBufferReads(), false);
}
+
+ private static class TestBlockReaderLocalReadCorruptStart
+ implements BlockReaderLocalTest {
+ boolean usingChecksums = false;
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException {
+ RandomAccessFile bf = null;
+ this.usingChecksums = usingChecksums;
+ try {
+ bf = new RandomAccessFile(blockFile, "rw");
+ bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+ } finally {
+ if (bf != null) bf.close();
+ }
+ }
- /**
- * Test that, in the case of an error, the position and limit of a ByteBuffer
- * are left unchanged. This is not mandated by ByteBufferReadable, but clients
- * of this class might immediately issue a retry on failure, so it's polite.
- */
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ byte buf[] = new byte[TEST_LENGTH];
+ if (usingChecksums) {
+ try {
+ reader.readFully(buf, 0, 10);
+ Assert.fail("did not detect corruption");
+ } catch (IOException e) {
+ // expected
+ }
+ } else {
+ reader.readFully(buf, 0, 10);
+ }
+ }
+ }
+
@Test
- public void testStablePositionAfterCorruptRead() throws Exception {
- final short REPL_FACTOR = 1;
- final long FILE_LENGTH = 512L;
- cluster.waitActive();
- FileSystem fs = cluster.getFileSystem();
-
- Path path = new Path("/corrupted");
-
- DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L);
- DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
-
- ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path);
- int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
- assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted);
-
- FSDataInputStream dis = cluster.getFileSystem().open(path);
- ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH);
- boolean sawException = false;
- try {
- dis.read(buf);
- } catch (ChecksumException ex) {
- sawException = true;
+ public void testBlockReaderLocalReadCorruptStart()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true);
+ }
+
+ private static class TestBlockReaderLocalReadCorrupt
+ implements BlockReaderLocalTest {
+ boolean usingChecksums = false;
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException {
+ RandomAccessFile bf = null;
+ this.usingChecksums = usingChecksums;
+ try {
+ bf = new RandomAccessFile(blockFile, "rw");
+ bf.seek(1539);
+ bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+ } finally {
+ if (bf != null) bf.close();
+ }
}
- assertTrue(sawException);
- assertEquals(0, buf.position());
- assertEquals(buf.capacity(), buf.limit());
-
- dis = cluster.getFileSystem().open(path);
- buf.position(3);
- buf.limit(25);
- sawException = false;
- try {
- dis.read(buf);
- } catch (ChecksumException ex) {
- sawException = true;
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ byte buf[] = new byte[TEST_LENGTH];
+ try {
+ reader.readFully(buf, 0, 10);
+ assertArrayRegionsEqual(original, 0, buf, 0, 10);
+ reader.readFully(buf, 10, 100);
+ assertArrayRegionsEqual(original, 10, buf, 10, 100);
+ reader.readFully(buf, 110, 700);
+ assertArrayRegionsEqual(original, 110, buf, 110, 700);
+ reader.skip(1); // skip from offset 810 to offset 811
+ reader.readFully(buf, 811, 5);
+ assertArrayRegionsEqual(original, 811, buf, 811, 5);
+ reader.readFully(buf, 816, 900);
+ if (usingChecksums) {
+ // We should detect the corruption when using a checksum file.
+ Assert.fail("did not detect corruption");
+ }
+ } catch (ChecksumException e) {
+ if (!usingChecksums) {
+ Assert.fail("didn't expect to get ChecksumException: not " +
+ "using checksums.");
+ }
+ }
}
+ }
- assertTrue(sawException);
- assertEquals(3, buf.position());
- assertEquals(25, buf.limit());
+ @Test
+ public void testBlockReaderLocalReadCorrupt()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true);
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false);
}
}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java Fri Jan 11 23:52:22 2013
@@ -61,7 +61,7 @@ public class TestClientBlockVerification
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close(null);
+ reader.close(null, null);
}
/**
@@ -76,7 +76,7 @@ public class TestClientBlockVerification
// We asked the blockreader for the whole file, and only read
// half of it, so no CHECKSUM_OK
verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
- reader.close(null);
+ reader.close(null, null);
}
/**
@@ -92,7 +92,7 @@ public class TestClientBlockVerification
// And read half the file
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close(null);
+ reader.close(null, null);
}
/**
@@ -111,7 +111,7 @@ public class TestClientBlockVerification
util.getBlockReader(testBlock, startOffset, length));
util.readAndCheckEOS(reader, length, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close(null);
+ reader.close(null, null);
}
}
}
Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java?rev=1432335&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java Fri Jan 11 23:52:22 2013
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.Test;
+
+public class TestFileInputStreamCache {
+ static final Log LOG = LogFactory.getLog(TestFileInputStreamCache.class);
+
+ @Test
+ public void testCreateAndDestroy() throws Exception {
+ FileInputStreamCache cache = new FileInputStreamCache(10, 1000);
+ cache.close();
+ }
+
+ private static class TestFileDescriptorPair {
+ TemporarySocketDirectory dir = new TemporarySocketDirectory();
+ FileInputStream fis[];
+
+ public TestFileDescriptorPair() throws IOException {
+ fis = new FileInputStream[2];
+ for (int i = 0; i < 2; i++) {
+ String name = dir.getDir() + "/file" + i;
+ FileOutputStream fos = new FileOutputStream(name);
+ fos.write(1);
+ fos.close();
+ fis[i] = new FileInputStream(name);
+ }
+ }
+
+ public FileInputStream[] getFileInputStreams() {
+ return fis;
+ }
+
+ public void close() throws IOException {
+ IOUtils.cleanup(LOG, fis);
+ dir.close();
+ }
+
+ public boolean compareWith(FileInputStream other[]) {
+ if ((other == null) || (fis == null)) {
+ return other == fis;
+ }
+ if (fis.length != other.length) return false;
+ for (int i = 0; i < fis.length; i++) {
+ if (fis[i] != other[i]) return false;
+ }
+ return true;
+ }
+ }
+
+ @Test
+ public void testAddAndRetrieve() throws Exception {
+ FileInputStreamCache cache = new FileInputStreamCache(1, 1000000);
+ DatanodeID dnId = new DatanodeID("127.0.0.1", "localhost",
+ "xyzzy", 8080, 9090, 7070);
+ ExtendedBlock block = new ExtendedBlock("poolid", 123);
+ TestFileDescriptorPair pair = new TestFileDescriptorPair();
+ cache.put(dnId, block, pair.getFileInputStreams());
+ FileInputStream fis[] = cache.get(dnId, block);
+ Assert.assertTrue(pair.compareWith(fis));
+ pair.close();
+ cache.close();
+ }
+
+ @Test
+ public void testExpiry() throws Exception {
+ FileInputStreamCache cache = new FileInputStreamCache(1, 10);
+ DatanodeID dnId = new DatanodeID("127.0.0.1", "localhost",
+ "xyzzy", 8080, 9090, 7070);
+ ExtendedBlock block = new ExtendedBlock("poolid", 123);
+ TestFileDescriptorPair pair = new TestFileDescriptorPair();
+ cache.put(dnId, block, pair.getFileInputStreams());
+ Thread.sleep(cache.getExpiryTimeMs() * 100);
+ FileInputStream fis[] = cache.get(dnId, block);
+ Assert.assertNull(fis);
+ pair.close();
+ cache.close();
+ }
+
+ @Test
+ public void testEviction() throws Exception {
+ FileInputStreamCache cache = new FileInputStreamCache(1, 10000000);
+ DatanodeID dnId = new DatanodeID("127.0.0.1", "localhost",
+ "xyzzy", 8080, 9090, 7070);
+ ExtendedBlock block = new ExtendedBlock("poolid", 123);
+ TestFileDescriptorPair pair = new TestFileDescriptorPair();
+ cache.put(dnId, block, pair.getFileInputStreams());
+ DatanodeID dnId2 = new DatanodeID("127.0.0.1", "localhost",
+ "xyzzy", 8081, 9091, 7071);
+ TestFileDescriptorPair pair2 = new TestFileDescriptorPair();
+ cache.put(dnId2, block, pair2.getFileInputStreams());
+ FileInputStream fis[] = cache.get(dnId, block);
+ Assert.assertNull(fis);
+ FileInputStream fis2[] = cache.get(dnId2, block);
+ Assert.assertTrue(pair2.compareWith(fis2));
+ pair.close();
+ cache.close();
+ }
+}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java Fri Jan 11 23:52:22 2013
@@ -17,14 +17,10 @@
*/
package org.apache.hadoop.hdfs;
-import java.io.IOException;
-
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Test;
public class TestParallelRead extends TestParallelReadUtil {
-
@BeforeClass
static public void setupCluster() throws Exception {
setupCluster(DEFAULT_REPLICATION_FACTOR, new HdfsConfiguration());
@@ -34,26 +30,4 @@ public class TestParallelRead extends Te
static public void teardownCluster() throws Exception {
TestParallelReadUtil.teardownCluster();
}
-
- /**
- * Do parallel read several times with different number of files and threads.
- *
- * Note that while this is the only "test" in a junit sense, we're actually
- * dispatching a lot more. Failures in the other methods (and other threads)
- * need to be manually collected, which is inconvenient.
- */
- @Test
- public void testParallelReadCopying() throws IOException {
- runTestWorkload(new CopyingReadWorkerHelper());
- }
-
- @Test
- public void testParallelReadByteBuffer() throws IOException {
- runTestWorkload(new DirectReadWorkerHelper());
- }
-
- @Test
- public void testParallelReadMixed() throws IOException {
- runTestWorkload(new MixedWorkloadHelper());
- }
}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java Fri Jan 11 23:52:22 2013
@@ -32,12 +32,18 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
+import org.junit.Assume;
+import org.junit.Ignore;
+import org.junit.Test;
/**
* Driver class for testing the use of DFSInputStream by multiple concurrent
- * readers, using the different read APIs. See subclasses for the actual test
- * cases.
+ * readers, using the different read APIs.
+ *
+ * This class is marked as @Ignore so that junit doesn't try to execute the
+ * tests in here directly. They are executed from subclasses.
*/
+@Ignore
public class TestParallelReadUtil {
static final Log LOG = LogFactory.getLog(TestParallelReadUtil.class);
@@ -386,4 +392,28 @@ public class TestParallelReadUtil {
util.shutdown();
}
+ /**
+ * Do parallel read several times with different number of files and threads.
+ *
+ * Note that while this is the only "test" in a junit sense, we're actually
+ * dispatching a lot more. Failures in the other methods (and other threads)
+ * need to be manually collected, which is inconvenient.
+ */
+ @Test
+ public void testParallelReadCopying() throws IOException {
+ Assume.assumeTrue(util.haveRequiredResources());
+ runTestWorkload(new CopyingReadWorkerHelper());
+ }
+
+ @Test
+ public void testParallelReadByteBuffer() throws IOException {
+ Assume.assumeTrue(util.haveRequiredResources());
+ runTestWorkload(new DirectReadWorkerHelper());
+ }
+
+ @Test
+ public void testParallelReadMixed() throws IOException {
+ Assume.assumeTrue(util.haveRequiredResources());
+ runTestWorkload(new MixedWorkloadHelper());
+ }
}
Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java?rev=1432335&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java Fri Jan 11 23:52:22 2013
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestParallelShortCircuitRead extends TestParallelReadUtil {
+ private static TemporarySocketDirectory sockDir;
+
+ @BeforeClass
+ static public void setupCluster() throws Exception {
+ sockDir = new TemporarySocketDirectory();
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+ conf.setBoolean(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+ DomainSocket.disableBindPathValidation();
+ setupCluster(1, conf);
+ }
+
+ @AfterClass
+ static public void teardownCluster() throws Exception {
+ sockDir.close();
+ TestParallelReadUtil.teardownCluster();
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java?rev=1432335&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java Fri Jan 11 23:52:22 2013
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestParallelShortCircuitReadNoChecksum extends TestParallelReadUtil {
+ private static TemporarySocketDirectory sockDir;
+
+ @BeforeClass
+ static public void setupCluster() throws Exception {
+ sockDir = new TemporarySocketDirectory();
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+ conf.setBoolean(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+ DomainSocket.disableBindPathValidation();
+ setupCluster(1, conf);
+ }
+
+ @AfterClass
+ static public void teardownCluster() throws Exception {
+ sockDir.close();
+ TestParallelReadUtil.teardownCluster();
+ }
+}
Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java?rev=1432335&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java Fri Jan 11 23:52:22 2013
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestParallelUnixDomainRead extends TestParallelReadUtil {
+ private static TemporarySocketDirectory sockDir;
+
+ @BeforeClass
+ static public void setupCluster() throws Exception {
+ sockDir = new TemporarySocketDirectory();
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+ DomainSocket.disableBindPathValidation();
+ setupCluster(1, conf);
+ }
+
+ @AfterClass
+ static public void teardownCluster() throws Exception {
+ sockDir.close();
+ TestParallelReadUtil.teardownCluster();
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Fri Jan 11 23:52:22 2013
@@ -20,9 +20,11 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertTrue;
import java.io.EOFException;
+import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -30,21 +32,22 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
/**
@@ -55,8 +58,18 @@ import org.junit.Test;
* system.
*/
public class TestShortCircuitLocalRead {
+ private static TemporarySocketDirectory sockDir;
- static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
+ @BeforeClass
+ public static void init() {
+ sockDir = new TemporarySocketDirectory();
+ DomainSocket.disableBindPathValidation();
+ }
+
+ @AfterClass
+ public static void shutdown() throws IOException {
+ sockDir.close();
+ }
static final long seed = 0xDEADBEEFL;
static final int blockSize = 5120;
@@ -81,7 +94,9 @@ public class TestShortCircuitLocalRead {
for (int idx = 0; idx < len; idx++) {
if (expected[from + idx] != actual[idx]) {
Assert.fail(message + " byte " + (from + idx) + " differs. expected "
- + expected[from + idx] + " actual " + actual[idx]);
+ + expected[from + idx] + " actual " + actual[idx] +
+ "\nexpected: " + StringUtils.byteToHexString(expected, from, from + len) +
+ "\nactual: " + StringUtils.byteToHexString(actual, 0, len));
}
}
}
@@ -170,8 +185,9 @@ public class TestShortCircuitLocalRead {
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
ignoreChecksum);
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- UserGroupInformation.getCurrentUser().getShortUserName());
+ conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(),
+ "TestShortCircuitLocalRead.__PORT__.sock").getAbsolutePath());
if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf);
}
@@ -229,23 +245,17 @@ public class TestShortCircuitLocalRead {
doTestShortCircuitRead(false, 10*blockSize+100, 777);
doTestShortCircuitRead(true, 10*blockSize+100, 777);
}
-
+
@Test
- public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
+ public void testDeprecatedGetBlockLocalPathInfoRpc()
+ throws IOException, InterruptedException {
final Configuration conf = new Configuration();
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- "alloweduser1,alloweduser2");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build();
cluster.waitActive();
- final DataNode dn = cluster.getDataNodes().get(0);
FileSystem fs = cluster.getFileSystem();
try {
DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
- UserGroupInformation aUgi1 =
- UserGroupInformation.createRemoteUser("alloweduser1");
- UserGroupInformation aUgi2 =
- UserGroupInformation.createRemoteUser("alloweduser2");
LocatedBlocks lb = cluster.getNameNode().getRpcServer()
.getBlockLocations("/tmp/x", 0, 16);
// Create a new block object, because the block inside LocatedBlock at
@@ -253,51 +263,11 @@ public class TestShortCircuitLocalRead {
ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
- ClientDatanodeProtocol proxy = aUgi1
- .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
- @Override
- public ClientDatanodeProtocol run() throws Exception {
- return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
- 60000, false);
- }
- });
-
- // This should succeed
- BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
- Assert.assertEquals(
- DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
- blpi.getBlockPath());
-
- // Try with the other allowed user
- proxy = aUgi2
- .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
- @Override
- public ClientDatanodeProtocol run() throws Exception {
- return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
- 60000, false);
- }
- });
-
- // This should succeed as well
- blpi = proxy.getBlockLocalPathInfo(blk, token);
- Assert.assertEquals(
- DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
- blpi.getBlockPath());
-
- // Now try with a disallowed user
- UserGroupInformation bUgi = UserGroupInformation
- .createRemoteUser("notalloweduser");
- proxy = bUgi
- .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
- @Override
- public ClientDatanodeProtocol run() throws Exception {
- return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
- 60000, false);
- }
- });
+ ClientDatanodeProtocol proxy =
+ DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, false);
try {
proxy.getBlockLocalPathInfo(blk, token);
- Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+ Assert.fail("The call should have failed as this user "
+ " is not allowed to call getBlockLocalPathInfo");
} catch (IOException ex) {
Assert.assertTrue(ex.getMessage().contains(
@@ -315,8 +285,6 @@ public class TestShortCircuitLocalRead {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- UserGroupInformation.getCurrentUser().getShortUserName());
if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf);
}
@@ -354,6 +322,86 @@ public class TestShortCircuitLocalRead {
cluster.shutdown();
}
}
+
+ @Test
+ public void testHandleTruncatedBlockFile() throws IOException {
+ MiniDFSCluster cluster = null;
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+ conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+ final Path TEST_PATH = new Path("/a");
+ final Path TEST_PATH2 = new Path("/b");
+ final long RANDOM_SEED = 4567L;
+ final long RANDOM_SEED2 = 4568L;
+ FSDataInputStream fsIn = null;
+ final int TEST_LENGTH = 3456;
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_LENGTH, (short)1, RANDOM_SEED);
+ DFSTestUtil.createFile(fs, TEST_PATH2,
+ TEST_LENGTH, (short)1, RANDOM_SEED2);
+ fsIn = cluster.getFileSystem().open(TEST_PATH2);
+ byte original[] = new byte[TEST_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_LENGTH);
+ fsIn.close();
+ fsIn = null;
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+ File dataFile = MiniDFSCluster.getBlockFile(0, block);
+ cluster.shutdown();
+ cluster = null;
+ RandomAccessFile raf = null;
+ try {
+ raf = new RandomAccessFile(dataFile, "rw");
+ raf.setLength(0);
+ } finally {
+ if (raf != null) raf.close();
+ }
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(false).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ fsIn = fs.open(TEST_PATH);
+ try {
+ byte buf[] = new byte[100];
+ fsIn.seek(2000);
+ fsIn.readFully(buf, 0, buf.length);
+ Assert.fail("shouldn't be able to read from corrupt 0-length " +
+ "block file.");
+ } catch (IOException e) {
+ DFSClient.LOG.error("caught exception ", e);
+ }
+ fsIn.close();
+ fsIn = null;
+
+ // We should still be able to read the other file.
+ // This is important because it indicates that we detected that the
+ // previous block was corrupt, rather than blaming the problem on
+ // communication.
+ fsIn = fs.open(TEST_PATH2);
+ byte buf[] = new byte[original.length];
+ fsIn.readFully(buf, 0, buf.length);
+ TestBlockReaderLocal.assertArrayRegionsEqual(original, 0, buf, 0,
+ original.length);
+ fsIn.close();
+ fsIn = null;
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
/**
* Test to run benchmarks between shortcircuit read vs regular read with
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Fri Jan 11 23:52:22 2013
@@ -148,7 +148,7 @@ public class TestBlockTokenWithDFS {
blockReader = BlockReaderFactory.newBlockReader(
conf, file, block, lblock.getBlockToken(), 0, -1,
true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
- nodes[0]);
+ nodes[0], null, false);
} catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException) {
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Fri Jan 11 23:52:22 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -962,6 +963,12 @@ public class SimulatedFSDataset implemen
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Fri Jan 11 23:52:22 2013
@@ -32,6 +32,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -281,11 +282,11 @@ public class TestDataNodeVolumeFailure {
String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid",
block.getBlockId());
- BlockReaderFactory.newBlockReader(conf, file, block,
+ BlockReader blockReader =
+ BlockReaderFactory.newBlockReader(conf, file, block,
lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
- TcpPeerServer.peerFromSocket(s), datanode);
-
- // nothing - if it fails - it will throw and exception
+ TcpPeerServer.peerFromSocket(s), datanode, null, false);
+ blockReader.close(null, null);
}
/**