You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:09 UTC
svn commit: r1513258 [2/6] - in
/hadoop/common/branches/YARN-321/hadoop-hdfs-project: ./
hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/tomcat/ROOT/ hadoop-hdfs-nfs/
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/
hadoop-hdfs-nfs/src/m...
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Mon Aug 12 21:25:49 2013
@@ -36,16 +36,10 @@ import org.apache.hadoop.hdfs.protocol.C
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolPB;
-import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
@@ -66,8 +60,15 @@ import org.apache.hadoop.security.Refres
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolClientSideTranslatorPB;
+import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolPB;
+import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
+import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
+import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
+import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@@ -297,9 +298,8 @@ public class NameNodeProxies {
return new ClientNamenodeProtocolTranslatorPB(proxy);
}
- @SuppressWarnings("unchecked")
private static Object createNameNodeProxy(InetSocketAddress address,
- Configuration conf, UserGroupInformation ugi, Class xface)
+ Configuration conf, UserGroupInformation ugi, Class<?> xface)
throws IOException {
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
@@ -308,7 +308,8 @@ public class NameNodeProxies {
}
/** Gets the configured Failover proxy provider's class */
- private static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
+ @VisibleForTesting
+ public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
if (nameNodeUri == null) {
return null;
@@ -345,8 +346,8 @@ public class NameNodeProxies {
}
/** Creates the Failover proxy provider instance*/
- @SuppressWarnings("unchecked")
- private static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
+ @VisibleForTesting
+ public static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass,
Class<T> xface, URI nameNodeUri) throws IOException {
Preconditions.checkArgument(
@@ -355,9 +356,9 @@ public class NameNodeProxies {
try {
Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
.getConstructor(Configuration.class, URI.class, Class.class);
- FailoverProxyProvider<?> provider = ctor.newInstance(conf, nameNodeUri,
+ FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
xface);
- return (FailoverProxyProvider<T>) provider;
+ return provider;
} catch (Exception e) {
String message = "Couldn't create proxy provider " + failoverProxyProviderClass;
if (LOG.isDebugEnabled()) {
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Mon Aug 12 21:25:49 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@@ -381,13 +382,14 @@ public class RemoteBlockReader extends F
int bufferSize, boolean verifyChecksum,
String clientName, Peer peer,
DatanodeID datanodeID,
- PeerCache peerCache)
- throws IOException {
+ PeerCache peerCache,
+ CachingStrategy cachingStrategy)
+ throws IOException {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
- verifyChecksum);
+ verifyChecksum, cachingStrategy);
//
// Get bytes in block, set streams
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Mon Aug 12 21:25:49 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -375,12 +376,13 @@ public class RemoteBlockReader2 impleme
boolean verifyChecksum,
String clientName,
Peer peer, DatanodeID datanodeID,
- PeerCache peerCache) throws IOException {
+ PeerCache peerCache,
+ CachingStrategy cachingStrategy) throws IOException {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream()));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
- verifyChecksum);
+ verifyChecksum, cachingStrategy);
//
// Get bytes in block
Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Mon Aug 12 21:25:49 2013
@@ -38,9 +38,9 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.KerberosInfo;
@@ -140,7 +140,7 @@ public interface ClientProtocol {
* <p>
* Blocks have a maximum size. Clients that intend to create
* multi-block files must also use
- * {@link #addBlock(String, String, ExtendedBlock, DatanodeInfo[])}
+ * {@link #addBlock}
*
* @param src path of the file being created.
* @param masked masked permission.
@@ -171,7 +171,10 @@ public interface ClientProtocol {
*
* RuntimeExceptions:
* @throws InvalidPathException Path <code>src</code> is invalid
+ * <p>
+ * <em>Note that create with {@link CreateFlag#OVERWRITE} is idempotent.</em>
*/
+ @AtMostOnce
public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize)
@@ -205,6 +208,7 @@ public interface ClientProtocol {
* RuntimeExceptions:
* @throws UnsupportedOperationException if append is not supported
*/
+ @AtMostOnce
public LocatedBlock append(String src, String clientName)
throws AccessControlException, DSQuotaExceededException,
FileNotFoundException, SafeModeException, UnresolvedLinkException,
@@ -275,8 +279,8 @@ public interface ClientProtocol {
/**
* The client can give up on a block by calling abandonBlock().
- * The client can then
- * either obtain a new block, or complete or abandon the file.
+ * The client can then either obtain a new block, or complete or abandon the
+ * file.
* Any partial writes to the block will be discarded.
*
* @throws AccessControlException If access is denied
@@ -284,6 +288,7 @@ public interface ClientProtocol {
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
* @throws IOException If an I/O error occurred
*/
+ @Idempotent
public void abandonBlock(ExtendedBlock b, String src, String holder)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException;
@@ -409,6 +414,7 @@ public interface ClientProtocol {
* @throws SnapshotAccessControlException if path is in RO snapshot
* @throws IOException an I/O error occurred
*/
+ @AtMostOnce
public boolean rename(String src, String dst)
throws UnresolvedLinkException, SnapshotAccessControlException, IOException;
@@ -422,6 +428,7 @@ public interface ClientProtocol {
* contains a symlink
* @throws SnapshotAccessControlException if path is in RO snapshot
*/
+ @AtMostOnce
public void concat(String trg, String[] srcs)
throws IOException, UnresolvedLinkException, SnapshotAccessControlException;
@@ -460,6 +467,7 @@ public interface ClientProtocol {
* @throws SnapshotAccessControlException if path is in RO snapshot
* @throws IOException If an I/O error occurred
*/
+ @AtMostOnce
public void rename2(String src, String dst, Options.Rename... options)
throws AccessControlException, DSQuotaExceededException,
FileAlreadyExistsException, FileNotFoundException,
@@ -484,6 +492,7 @@ public interface ClientProtocol {
* @throws SnapshotAccessControlException if path is in RO snapshot
* @throws IOException If an I/O error occurred
*/
+ @AtMostOnce
public boolean delete(String src, boolean recursive)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, SnapshotAccessControlException, IOException;
@@ -704,6 +713,7 @@ public interface ClientProtocol {
* @throws AccessControlException if the superuser privilege is violated.
* @throws IOException if image creation failed.
*/
+ @AtMostOnce
public void saveNamespace() throws AccessControlException, IOException;
@@ -725,6 +735,7 @@ public interface ClientProtocol {
*
* @throws AccessControlException if the superuser privilege is violated.
*/
+ @Idempotent
public boolean restoreFailedStorage(String arg)
throws AccessControlException, IOException;
@@ -732,6 +743,7 @@ public interface ClientProtocol {
* Tells the namenode to reread the hosts and exclude files.
* @throws IOException
*/
+ @Idempotent
public void refreshNodes() throws IOException;
/**
@@ -741,6 +753,7 @@ public interface ClientProtocol {
*
* @throws IOException
*/
+ @Idempotent
public void finalizeUpgrade() throws IOException;
/**
@@ -763,6 +776,7 @@ public interface ClientProtocol {
*
* @throws IOException
*/
+ @Idempotent
public void metaSave(String filename) throws IOException;
/**
@@ -918,6 +932,7 @@ public interface ClientProtocol {
* @throws SnapshotAccessControlException if path is in RO snapshot
* @throws IOException If an I/O error occurred
*/
+ @AtMostOnce
public void createSymlink(String target, String link, FsPermission dirPerm,
boolean createParent) throws AccessControlException,
FileAlreadyExistsException, FileNotFoundException,
@@ -965,6 +980,7 @@ public interface ClientProtocol {
* @param newNodes datanodes in the pipeline
* @throws IOException if any error occurs
*/
+ @AtMostOnce
public void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes)
throws IOException;
@@ -997,6 +1013,7 @@ public interface ClientProtocol {
* @param token delegation token
* @throws IOException
*/
+ @Idempotent
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException;
@@ -1005,6 +1022,7 @@ public interface ClientProtocol {
* DataTransferProtocol to/from DataNodes.
* @throws IOException
*/
+ @Idempotent
public DataEncryptionKey getDataEncryptionKey() throws IOException;
/**
@@ -1014,6 +1032,7 @@ public interface ClientProtocol {
* @return the snapshot path.
* @throws IOException
*/
+ @AtMostOnce
public String createSnapshot(String snapshotRoot, String snapshotName)
throws IOException;
@@ -1023,6 +1042,7 @@ public interface ClientProtocol {
* @param snapshotName Name of the snapshot for the snapshottable directory
* @throws IOException
*/
+ @AtMostOnce
public void deleteSnapshot(String snapshotRoot, String snapshotName)
throws IOException;
@@ -1033,6 +1053,7 @@ public interface ClientProtocol {
* @param snapshotNewName new name of the snapshot
* @throws IOException
*/
+ @AtMostOnce
public void renameSnapshot(String snapshotRoot, String snapshotOldName,
String snapshotNewName) throws IOException;
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Mon Aug 12 21:25:49 2013
@@ -327,7 +327,7 @@ public class DatanodeInfo extends Datano
* Check if the datanode is in stale state. Here if
* the namenode has not received heartbeat msg from a
* datanode for more than staleInterval (default value is
- * {@link DFSConfigKeys#DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT}),
+ * {@link DFSConfigKeys#DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT}),
* the datanode will be treated as stale node.
*
* @param staleInterval
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java Mon Aug 12 21:25:49 2013
@@ -104,7 +104,9 @@ public class LayoutVersion {
OPTIMIZE_SNAPSHOT_INODES(-45, -43,
"Reduce snapshot inode memory footprint", false),
SEQUENTIAL_BLOCK_ID(-46, "Allocate block IDs sequentially and store " +
- "block IDs in the edits log and image files");
+ "block IDs in the edits log and image files"),
+ EDITLOG_SUPPORT_RETRYCACHE(-47, "Record ClientId and CallId in editlog to "
+ + "enable rebuilding retry cache in case of HA failover");
final int lv;
final int ancestorLV;
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Mon Aug 12 21:25:49 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -57,13 +58,15 @@ public interface DataTransferProtocol {
* @param length maximum number of bytes for this read.
* @param sendChecksum if false, the DN should skip reading and sending
* checksums
+ * @param cachingStrategy The caching strategy to use.
*/
public void readBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length,
- final boolean sendChecksum) throws IOException;
+ final boolean sendChecksum,
+ final CachingStrategy cachingStrategy) throws IOException;
/**
* Write a block to a datanode pipeline.
@@ -89,7 +92,8 @@ public interface DataTransferProtocol {
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
- final DataChecksum requestedChecksum) throws IOException;
+ final DataChecksum requestedChecksum,
+ final CachingStrategy cachingStrategy) throws IOException;
/**
* Transfer a block to another datanode.
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Mon Aug 12 21:25:49 2013
@@ -31,8 +31,10 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
/** Receiver */
@InterfaceAudience.Private
@@ -85,6 +87,14 @@ public abstract class Receiver implement
}
}
+ static private CachingStrategy getCachingStrategy(CachingStrategyProto strategy) {
+ Boolean dropBehind = strategy.hasDropBehind() ?
+ strategy.getDropBehind() : null;
+ Long readahead = strategy.hasReadahead() ?
+ strategy.getReadahead() : null;
+ return new CachingStrategy(dropBehind, readahead);
+ }
+
/** Receive OP_READ_BLOCK */
private void opReadBlock() throws IOException {
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
@@ -93,7 +103,10 @@ public abstract class Receiver implement
proto.getHeader().getClientName(),
proto.getOffset(),
proto.getLen(),
- proto.getSendChecksums());
+ proto.getSendChecksums(),
+ (proto.hasCachingStrategy() ?
+ getCachingStrategy(proto.getCachingStrategy()) :
+ CachingStrategy.newDefaultStrategy()));
}
/** Receive OP_WRITE_BLOCK */
@@ -108,7 +121,10 @@ public abstract class Receiver implement
proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
proto.getLatestGenerationStamp(),
- fromProto(proto.getRequestedChecksum()));
+ fromProto(proto.getRequestedChecksum()),
+ (proto.hasCachingStrategy() ?
+ getCachingStrategy(proto.getCachingStrategy()) :
+ CachingStrategy.newDefaultStrategy()));
}
/** Receive {@link Op#TRANSFER_BLOCK} */
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Mon Aug 12 21:25:49 2013
@@ -35,9 +35,11 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -72,19 +74,32 @@ public class Sender implements DataTrans
out.flush();
}
+ static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) {
+ CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder();
+ if (cachingStrategy.getReadahead() != null) {
+ builder.setReadahead(cachingStrategy.getReadahead().longValue());
+ }
+ if (cachingStrategy.getDropBehind() != null) {
+ builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue());
+ }
+ return builder.build();
+ }
+
@Override
public void readBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length,
- final boolean sendChecksum) throws IOException {
+ final boolean sendChecksum,
+ final CachingStrategy cachingStrategy) throws IOException {
OpReadBlockProto proto = OpReadBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
.setOffset(blockOffset)
.setLen(length)
.setSendChecksums(sendChecksum)
+ .setCachingStrategy(getCachingStrategy(cachingStrategy))
.build();
send(out, Op.READ_BLOCK, proto);
@@ -102,7 +117,8 @@ public class Sender implements DataTrans
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
- DataChecksum requestedChecksum) throws IOException {
+ DataChecksum requestedChecksum,
+ final CachingStrategy cachingStrategy) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken);
@@ -117,7 +133,8 @@ public class Sender implements DataTrans
.setMinBytesRcvd(minBytesRcvd)
.setMaxBytesRcvd(maxBytesRcvd)
.setLatestGenerationStamp(latestGenerationStamp)
- .setRequestedChecksum(checksumProto);
+ .setRequestedChecksum(checksumProto)
+ .setCachingStrategy(getCachingStrategy(cachingStrategy));
if (source != null) {
proto.setSource(PBHelper.convertDatanodeInfo(source));
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Mon Aug 12 21:25:49 2013
@@ -707,11 +707,9 @@ public class ClientNamenodeProtocolServe
try {
HdfsFileStatus result = server.getFileLinkInfo(req.getSrc());
if (result != null) {
- System.out.println("got non null result for getFileLinkInfo for " + req.getSrc());
return GetFileLinkInfoResponseProto.newBuilder().setFs(
PBHelper.convert(result)).build();
} else {
- System.out.println("got null result for getFileLinkInfo for " + req.getSrc());
return VOID_GETFILELINKINFO_RESPONSE;
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Mon Aug 12 21:25:49 2013
@@ -1056,7 +1056,7 @@ public class PBHelper {
fs.getPath().toByteArray(),
fs.hasFileId()? fs.getFileId(): INodeId.GRANDFATHER_INODE_ID,
fs.hasLocations() ? PBHelper.convert(fs.getLocations()) : null,
- fs.hasChildrenNum() ? fs.getChildrenNum() : 0);
+ fs.hasChildrenNum() ? fs.getChildrenNum() : -1);
}
public static SnapshottableDirectoryStatus convert(
Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/overview.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Mon Aug 12 21:25:49 2013
@@ -22,7 +22,7 @@ import java.util.LinkedList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
-import org.apache.hadoop.hdfs.util.LightWeightGSet;
+import org.apache.hadoop.util.LightWeightGSet;
/**
* BlockInfo class maintains for a given block
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Mon Aug 12 21:25:49 2013
@@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
@@ -86,7 +87,7 @@ import com.google.common.collect.Sets;
public class BlockManager {
static final Log LOG = LogFactory.getLog(BlockManager.class);
- static final Log blockLog = NameNode.blockStateChangeLog;
+ public static final Log blockLog = NameNode.blockStateChangeLog;
/** Default load factor of map */
public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
@@ -216,6 +217,9 @@ public class BlockManager {
// whether or not to issue block encryption keys.
final boolean encryptDataTransfer;
+
+ // Max number of blocks to log info about during a block report.
+ private final long maxNumBlocksToLog;
/**
* When running inside a Standby node, the node may receive block reports
@@ -297,6 +301,10 @@ public class BlockManager {
conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
+ this.maxNumBlocksToLog =
+ conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
+ DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
+
LOG.info("defaultReplication = " + defaultReplication);
LOG.info("maxReplication = " + maxReplication);
LOG.info("minReplication = " + minReplication);
@@ -304,6 +312,7 @@ public class BlockManager {
LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
LOG.info("encryptDataTransfer = " + encryptDataTransfer);
+ LOG.info("maxNumBlocksToLog = " + maxNumBlocksToLog);
}
private static BlockTokenSecretManager createBlockTokenSecretManager(
@@ -314,6 +323,12 @@ public class BlockManager {
LOG.info(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY + "=" + isEnabled);
if (!isEnabled) {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LOG.error("Security is enabled but block access tokens " +
+ "(via " + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY + ") " +
+ "aren't enabled. This may cause issues " +
+ "when clients attempt to talk to a DataNode.");
+ }
return null;
}
@@ -845,8 +860,10 @@ public class BlockManager {
public void setBlockToken(final LocatedBlock b,
final BlockTokenSecretManager.AccessMode mode) throws IOException {
if (isBlockTokenEnabled()) {
- b.setBlockToken(blockTokenSecretManager.generateToken(b.getBlock(),
- EnumSet.of(mode)));
+ // Use cached UGI if serving RPC calls.
+ b.setBlockToken(blockTokenSecretManager.generateToken(
+ NameNode.getRemoteUser().getShortUserName(),
+ b.getBlock(), EnumSet.of(mode)));
}
}
@@ -1319,7 +1336,7 @@ public class BlockManager {
// Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry
// replications that fail after an appropriate amount of time.
- pendingReplications.increment(block, targets.length);
+ pendingReplications.increment(block, targets);
if(blockLog.isDebugEnabled()) {
blockLog.debug(
"BLOCK* block " + block
@@ -1698,8 +1715,14 @@ public class BlockManager {
for (Block b : toRemove) {
removeStoredBlock(b, node);
}
+ int numBlocksLogged = 0;
for (BlockInfo b : toAdd) {
- addStoredBlock(b, node, null, true);
+ addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog);
+ numBlocksLogged++;
+ }
+ if (numBlocksLogged > maxNumBlocksToLog) {
+ blockLog.info("BLOCK* processReport: logged info for " + maxNumBlocksToLog
+ + " of " + numBlocksLogged + " reported.");
}
for (Block b : toInvalidate) {
blockLog.info("BLOCK* processReport: "
@@ -2599,6 +2622,8 @@ assert storedBlock.findDatanode(dn) < 0
void addBlock(DatanodeDescriptor node, Block block, String delHint)
throws IOException {
// decrement number of blocks scheduled to this datanode.
+ // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
+ // RECEIVED_BLOCK), we currently also decrease the approximate number.
node.decBlocksScheduled();
// get the deletion hint node
@@ -2614,7 +2639,7 @@ assert storedBlock.findDatanode(dn) < 0
//
// Modify the blocks->datanode map and node's map.
//
- pendingReplications.decrement(block);
+ pendingReplications.decrement(block, node);
processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
delHintNode);
}
@@ -2637,8 +2662,14 @@ assert storedBlock.findDatanode(dn) < 0
for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
}
+ long numBlocksLogged = 0;
for (BlockInfo b : toAdd) {
- addStoredBlock(b, node, delHintNode, true);
+ addStoredBlock(b, node, delHintNode, numBlocksLogged < maxNumBlocksToLog);
+ numBlocksLogged++;
+ }
+ if (numBlocksLogged > maxNumBlocksToLog) {
+ blockLog.info("BLOCK* addBlock: logged info for " + maxNumBlocksToLog
+ + " of " + numBlocksLogged + " reported.");
}
for (Block b : toInvalidate) {
blockLog.info("BLOCK* addBlock: block "
@@ -2655,64 +2686,58 @@ assert storedBlock.findDatanode(dn) < 0
* The given node is reporting incremental information about some blocks.
* This includes blocks that are starting to be received, completed being
* received, or deleted.
+ *
+ * This method must be called with FSNamesystem lock held.
*/
- public void processIncrementalBlockReport(final DatanodeID nodeID,
- final String poolId,
- final ReceivedDeletedBlockInfo blockInfos[]
- ) throws IOException {
- namesystem.writeLock();
+ public void processIncrementalBlockReport(final DatanodeID nodeID,
+ final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
+ throws IOException {
+ assert namesystem.hasWriteLock();
int received = 0;
int deleted = 0;
int receiving = 0;
- try {
- final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
- if (node == null || !node.isAlive) {
- blockLog
- .warn("BLOCK* processIncrementalBlockReport"
- + " is received from dead or unregistered node "
- + nodeID);
- throw new IOException(
- "Got incremental block report from unregistered or dead node");
- }
+ final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+ if (node == null || !node.isAlive) {
+ blockLog
+ .warn("BLOCK* processIncrementalBlockReport"
+ + " is received from dead or unregistered node "
+ + nodeID);
+ throw new IOException(
+ "Got incremental block report from unregistered or dead node");
+ }
- for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
- switch (rdbi.getStatus()) {
- case DELETED_BLOCK:
- removeStoredBlock(rdbi.getBlock(), node);
- deleted++;
- break;
- case RECEIVED_BLOCK:
- addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
- received++;
- break;
- case RECEIVING_BLOCK:
- receiving++;
- processAndHandleReportedBlock(node, rdbi.getBlock(),
- ReplicaState.RBW, null);
- break;
- default:
- String msg =
- "Unknown block status code reported by " + nodeID +
- ": " + rdbi;
- blockLog.warn(msg);
- assert false : msg; // if assertions are enabled, throw.
- break;
- }
- if (blockLog.isDebugEnabled()) {
- blockLog.debug("BLOCK* block "
- + (rdbi.getStatus()) + ": " + rdbi.getBlock()
- + " is received from " + nodeID);
- }
+ for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
+ switch (rdbi.getStatus()) {
+ case DELETED_BLOCK:
+ removeStoredBlock(rdbi.getBlock(), node);
+ deleted++;
+ break;
+ case RECEIVED_BLOCK:
+ addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
+ received++;
+ break;
+ case RECEIVING_BLOCK:
+ receiving++;
+ processAndHandleReportedBlock(node, rdbi.getBlock(),
+ ReplicaState.RBW, null);
+ break;
+ default:
+ String msg =
+ "Unknown block status code reported by " + nodeID +
+ ": " + rdbi;
+ blockLog.warn(msg);
+ assert false : msg; // if assertions are enabled, throw.
+ break;
+ }
+ if (blockLog.isDebugEnabled()) {
+ blockLog.debug("BLOCK* block "
+ + (rdbi.getStatus()) + ": " + rdbi.getBlock()
+ + " is received from " + nodeID);
}
- } finally {
- namesystem.writeUnlock();
- blockLog
- .debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
- + nodeID
- + " receiving: " + receiving + ", "
- + " received: " + received + ", "
- + " deleted: " + deleted);
}
+ blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
+ + nodeID + " receiving: " + receiving + ", " + " received: " + received
+ + ", " + " deleted: " + deleted);
}
/**
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Mon Aug 12 21:25:49 2013
@@ -20,8 +20,8 @@ package org.apache.hadoop.hdfs.server.bl
import java.util.Iterator;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.util.GSet;
-import org.apache.hadoop.hdfs.util.LightWeightGSet;
+import org.apache.hadoop.util.GSet;
+import org.apache.hadoop.util.LightWeightGSet;
/**
* This class maintains the map from a block to its metadata.
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Mon Aug 12 21:25:49 2013
@@ -22,8 +22,10 @@ import static org.apache.hadoop.util.Tim
import java.io.PrintWriter;
import java.sql.Time;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -71,14 +73,16 @@ class PendingReplicationBlocks {
/**
* Add a block to the list of pending Replications
+ * @param block The corresponding block
+ * @param targets The DataNodes where replicas of the block should be placed
*/
- void increment(Block block, int numReplicas) {
+ void increment(Block block, DatanodeDescriptor[] targets) {
synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block);
if (found == null) {
- pendingReplications.put(block, new PendingBlockInfo(numReplicas));
+ pendingReplications.put(block, new PendingBlockInfo(targets));
} else {
- found.incrementReplicas(numReplicas);
+ found.incrementReplicas(targets);
found.setTimeStamp();
}
}
@@ -88,15 +92,17 @@ class PendingReplicationBlocks {
* One replication request for this block has finished.
* Decrement the number of pending replication requests
* for this block.
+ *
+ * @param The DataNode that finishes the replication
*/
- void decrement(Block block) {
+ void decrement(Block block, DatanodeDescriptor dn) {
synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block);
if (found != null) {
if(LOG.isDebugEnabled()) {
LOG.debug("Removing pending replication for " + block);
}
- found.decrementReplicas();
+ found.decrementReplicas(dn);
if (found.getNumReplicas() <= 0) {
pendingReplications.remove(block);
}
@@ -153,7 +159,7 @@ class PendingReplicationBlocks {
return null;
}
Block[] blockList = timedOutItems.toArray(
- new Block[timedOutItems.size()]);
+ new Block[timedOutItems.size()]);
timedOutItems.clear();
return blockList;
}
@@ -163,16 +169,17 @@ class PendingReplicationBlocks {
* An object that contains information about a block that
* is being replicated. It records the timestamp when the
* system started replicating the most recent copy of this
- * block. It also records the number of replication
- * requests that are in progress.
+ * block. It also records the list of Datanodes where the
+ * replication requests are in progress.
*/
static class PendingBlockInfo {
private long timeStamp;
- private int numReplicasInProgress;
+ private final List<DatanodeDescriptor> targets;
- PendingBlockInfo(int numReplicas) {
+ PendingBlockInfo(DatanodeDescriptor[] targets) {
this.timeStamp = now();
- this.numReplicasInProgress = numReplicas;
+ this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
+ : new ArrayList<DatanodeDescriptor>(Arrays.asList(targets));
}
long getTimeStamp() {
@@ -183,17 +190,20 @@ class PendingReplicationBlocks {
timeStamp = now();
}
- void incrementReplicas(int increment) {
- numReplicasInProgress += increment;
+ void incrementReplicas(DatanodeDescriptor... newTargets) {
+ if (newTargets != null) {
+ for (DatanodeDescriptor dn : newTargets) {
+ targets.add(dn);
+ }
+ }
}
- void decrementReplicas() {
- numReplicasInProgress--;
- assert(numReplicasInProgress >= 0);
+ void decrementReplicas(DatanodeDescriptor dn) {
+ targets.remove(dn);
}
int getNumReplicas() {
- return numReplicasInProgress;
+ return targets.size();
}
}
@@ -274,7 +284,7 @@ class PendingReplicationBlocks {
out.println(block +
" StartTime: " + new Time(pendingBlock.timeStamp) +
" NumReplicaInProgress: " +
- pendingBlock.numReplicasInProgress);
+ pendingBlock.getNumReplicas());
}
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Mon Aug 12 21:25:49 2013
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
@@ -217,7 +218,7 @@ public class JspHelper {
"JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
new DatanodeID(addr.getAddress().getHostAddress(),
addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
- null, null, false);
+ null, null, false, CachingStrategy.newDefaultStrategy());
final byte[] buf = new byte[amtToRead];
int readOffset = 0;
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Mon Aug 12 21:25:49 2013
@@ -48,9 +48,9 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.hdfs.util.GSet;
-import org.apache.hadoop.hdfs.util.LightWeightGSet;
-import org.apache.hadoop.hdfs.util.LightWeightGSet.LinkedElement;
+import org.apache.hadoop.util.GSet;
+import org.apache.hadoop.util.LightWeightGSet;
+import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Time;
@@ -417,7 +417,7 @@ class BlockPoolSliceScanner {
adjustThrottler();
blockSender = new BlockSender(block, 0, -1, false, true, true,
- datanode, null);
+ datanode, null, CachingStrategy.newDropBehind());
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon Aug 12 21:25:49 2013
@@ -51,6 +51,9 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
/** A class that receives a block and writes to its own disk, meanwhile
* may copies it to another site. If a throttler is provided,
@@ -60,7 +63,8 @@ class BlockReceiver implements Closeable
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
- private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
+ @VisibleForTesting
+ static long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
private DataInputStream in = null; // from where data are read
private DataChecksum clientChecksum; // checksum used by client
@@ -96,8 +100,8 @@ class BlockReceiver implements Closeable
// Cache management state
private boolean dropCacheBehindWrites;
+ private long lastCacheManagementOffset = 0;
private boolean syncBehindWrites;
- private long lastCacheDropOffset = 0;
/** The client name. It is empty if a datanode is the client */
private final String clientname;
@@ -119,8 +123,8 @@ class BlockReceiver implements Closeable
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
- final DataNode datanode, DataChecksum requestedChecksum)
- throws IOException {
+ final DataNode datanode, DataChecksum requestedChecksum,
+ CachingStrategy cachingStrategy) throws IOException {
try{
this.block = block;
this.in = in;
@@ -145,6 +149,7 @@ class BlockReceiver implements Closeable
+ "\n isClient =" + isClient + ", clientname=" + clientname
+ "\n isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
+ "\n inAddr=" + inAddr + ", myAddr=" + myAddr
+ + "\n cachingStrategy = " + cachingStrategy
);
}
@@ -191,7 +196,9 @@ class BlockReceiver implements Closeable
" while receiving block " + block + " from " + inAddr);
}
}
- this.dropCacheBehindWrites = datanode.getDnConf().dropCacheBehindWrites;
+ this.dropCacheBehindWrites = (cachingStrategy.getDropBehind() == null) ?
+ datanode.getDnConf().dropCacheBehindWrites :
+ cachingStrategy.getDropBehind();
this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
final boolean isCreate = isDatanode || isTransfer
@@ -597,7 +604,7 @@ class BlockReceiver implements Closeable
datanode.metrics.incrBytesWritten(len);
- dropOsCacheBehindWriter(offsetInBlock);
+ manageWriterOsCache(offsetInBlock);
}
} catch (IOException iex) {
datanode.checkDiskError(iex);
@@ -619,25 +626,44 @@ class BlockReceiver implements Closeable
return lastPacketInBlock?-1:len;
}
- private void dropOsCacheBehindWriter(long offsetInBlock) {
+ private void manageWriterOsCache(long offsetInBlock) {
try {
if (outFd != null &&
- offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) {
- long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES;
- if (twoWindowsAgo > 0 && dropCacheBehindWrites) {
- NativeIO.POSIX.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset,
- NativeIO.POSIX.POSIX_FADV_DONTNEED);
- }
-
+ offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
+ //
+ // For SYNC_FILE_RANGE_WRITE, we want to sync from
+ // lastCacheManagementOffset to a position "two windows ago"
+ //
+ // <========= sync ===========>
+ // +-----------------------O--------------------------X
+ // start last curPos
+ // of file
+ //
if (syncBehindWrites) {
- NativeIO.POSIX.syncFileRangeIfPossible(outFd, lastCacheDropOffset, CACHE_DROP_LAG_BYTES,
+ NativeIO.POSIX.syncFileRangeIfPossible(outFd,
+ lastCacheManagementOffset,
+ offsetInBlock - lastCacheManagementOffset,
NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
}
-
- lastCacheDropOffset += CACHE_DROP_LAG_BYTES;
+ //
+ // For POSIX_FADV_DONTNEED, we want to drop from the beginning
+ // of the file to a position prior to the current position.
+ //
+ // <=== drop =====>
+ // <---W--->
+ // +--------------+--------O--------------------------X
+ // start dropPos last curPos
+ // of file
+ //
+ long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
+ if (dropPos > 0 && dropCacheBehindWrites) {
+ NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
+ outFd, 0, dropPos, NativeIO.POSIX.POSIX_FADV_DONTNEED);
+ }
+ lastCacheManagementOffset = offsetInBlock;
}
} catch (Throwable t) {
- LOG.warn("Couldn't drop os cache behind writer for " + block, t);
+ LOG.warn("Error managing cache for writer of block " + block, t);
}
}
@@ -706,7 +732,13 @@ class BlockReceiver implements Closeable
}
if (responder != null) {
try {
- responder.join();
+ responder.join(datanode.getDnConf().getXceiverStopTimeout());
+ if (responder.isAlive()) {
+ String msg = "Join on responder thread " + responder
+ + " timed out";
+ LOG.warn(msg + "\n" + StringUtils.getStackTrace(responder));
+ throw new IOException(msg);
+ }
} catch (InterruptedException e) {
responder.interrupt();
throw new IOException("Interrupted receiveBlock");
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Mon Aug 12 21:25:49 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.io.nativeio.Nat
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@@ -141,13 +142,22 @@ class BlockSender implements java.io.Clo
// Cache-management related fields
private final long readaheadLength;
- private boolean shouldDropCacheBehindRead;
+
private ReadaheadRequest curReadahead;
+
+ private final boolean alwaysReadahead;
+
+ private final boolean dropCacheBehindLargeReads;
+
+ private final boolean dropCacheBehindAllReads;
+
private long lastCacheDropOffset;
- private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+
+ @VisibleForTesting
+ static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+
/**
- * Minimum length of read below which management of the OS
- * buffer cache is disabled.
+ * See {{@link BlockSender#isLongRead()}
*/
private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
@@ -167,16 +177,42 @@ class BlockSender implements java.io.Clo
*/
BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean verifyChecksum,
- boolean sendChecksum,
- DataNode datanode, String clientTraceFmt)
+ boolean sendChecksum, DataNode datanode, String clientTraceFmt,
+ CachingStrategy cachingStrategy)
throws IOException {
try {
this.block = block;
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
this.clientTraceFmt = clientTraceFmt;
- this.readaheadLength = datanode.getDnConf().readaheadLength;
- this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
+
+ /*
+ * If the client asked for the cache to be dropped behind all reads,
+ * we honor that. Otherwise, we use the DataNode defaults.
+ * When using DataNode defaults, we use a heuristic where we only
+ * drop the cache for large reads.
+ */
+ if (cachingStrategy.getDropBehind() == null) {
+ this.dropCacheBehindAllReads = false;
+ this.dropCacheBehindLargeReads =
+ datanode.getDnConf().dropCacheBehindReads;
+ } else {
+ this.dropCacheBehindAllReads =
+ this.dropCacheBehindLargeReads =
+ cachingStrategy.getDropBehind().booleanValue();
+ }
+ /*
+ * Similarly, if readahead was explicitly requested, we always do it.
+ * Otherwise, we read ahead based on the DataNode settings, and only
+ * when the reads are large.
+ */
+ if (cachingStrategy.getReadahead() == null) {
+ this.alwaysReadahead = false;
+ this.readaheadLength = datanode.getDnConf().readaheadLength;
+ } else {
+ this.alwaysReadahead = true;
+ this.readaheadLength = cachingStrategy.getReadahead().longValue();
+ }
this.datanode = datanode;
if (verifyChecksum) {
@@ -335,10 +371,11 @@ class BlockSender implements java.io.Clo
*/
@Override
public void close() throws IOException {
- if (blockInFd != null && shouldDropCacheBehindRead && isLongRead()) {
- // drop the last few MB of the file from cache
+ if (blockInFd != null &&
+ ((dropCacheBehindAllReads) ||
+ (dropCacheBehindLargeReads && isLongRead()))) {
try {
- NativeIO.POSIX.posixFadviseIfPossible(
+ NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
NativeIO.POSIX.POSIX_FADV_DONTNEED);
} catch (Exception e) {
@@ -637,7 +674,7 @@ class BlockSender implements java.io.Clo
if (isLongRead() && blockInFd != null) {
// Advise that this file descriptor will be accessed sequentially.
- NativeIO.POSIX.posixFadviseIfPossible(
+ NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
}
@@ -705,37 +742,47 @@ class BlockSender implements java.io.Clo
* and drop-behind.
*/
private void manageOsCache() throws IOException {
- if (!isLongRead() || blockInFd == null) {
- // don't manage cache manually for short-reads, like
- // HBase random read workloads.
- return;
- }
+ // We can't manage the cache for this block if we don't have a file
+ // descriptor to work with.
+ if (blockInFd == null) return;
// Perform readahead if necessary
- if (readaheadLength > 0 && datanode.readaheadPool != null) {
+ if ((readaheadLength > 0) && (datanode.readaheadPool != null) &&
+ (alwaysReadahead || isLongRead())) {
curReadahead = datanode.readaheadPool.readaheadStream(
- clientTraceFmt, blockInFd,
- offset, readaheadLength, Long.MAX_VALUE,
+ clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE,
curReadahead);
}
// Drop what we've just read from cache, since we aren't
// likely to need it again
- long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
- if (shouldDropCacheBehindRead &&
- offset >= nextCacheDropOffset) {
- long dropLength = offset - lastCacheDropOffset;
- if (dropLength >= 1024) {
- NativeIO.POSIX.posixFadviseIfPossible(blockInFd,
- lastCacheDropOffset, dropLength,
+ if (dropCacheBehindAllReads ||
+ (dropCacheBehindLargeReads && isLongRead())) {
+ long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
+ if (offset >= nextCacheDropOffset) {
+ long dropLength = offset - lastCacheDropOffset;
+ NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
+ blockInFd, lastCacheDropOffset, dropLength,
NativeIO.POSIX.POSIX_FADV_DONTNEED);
+ lastCacheDropOffset = offset;
}
- lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
}
}
+ /**
+ * Returns true if we have done a long enough read for this block to qualify
+ * for the DataNode-wide cache management defaults. We avoid applying the
+ * cache management defaults to smaller reads because the overhead would be
+ * too high.
+ *
+ * Note that if the client explicitly asked for dropBehind, we will do it
+ * even on short reads.
+ *
+ * This is also used to determine when to invoke
+ * posix_fadvise(POSIX_FADV_SEQUENTIAL).
+ */
private boolean isLongRead() {
- return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
+ return (endOffset - initialOffset) > LONG_READ_THRESHOLD_BYTES;
}
/**
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Mon Aug 12 21:25:49 2013
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import org.apache.hadoop.classification.InterfaceAudience;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
@@ -29,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY;
@@ -44,7 +47,8 @@ import org.apache.hadoop.hdfs.server.com
* Simple class encapsulating all of the configuration that the DataNode
* loads at startup time.
*/
-class DNConf {
+@InterfaceAudience.Private
+public class DNConf {
final int socketTimeout;
final int socketWriteTimeout;
final int socketKeepaliveTimeout;
@@ -66,6 +70,8 @@ class DNConf {
final String minimumNameNodeVersion;
final String encryptionAlgorithm;
+
+ final long xceiverStopTimeout;
public DNConf(Configuration conf) {
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@@ -127,10 +133,18 @@ class DNConf {
this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY,
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
+
+ this.xceiverStopTimeout = conf.getLong(
+ DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
+ DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
}
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
String getMinimumNameNodeVersion() {
return this.minimumNameNodeVersion;
}
+
+ public long getXceiverStopTimeout() {
+ return xceiverStopTimeout;
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Aug 12 21:25:49 2013
@@ -171,6 +171,7 @@ import org.apache.hadoop.util.DiskChecke
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
@@ -284,6 +285,8 @@ public class DataNode extends Configured
// For InterDataNodeProtocol
public RPC.Server ipcServer;
+ private JvmPauseMonitor pauseMonitor;
+
private SecureResources secureResources = null;
private AbstractList<File> dataDirs;
private Configuration conf;
@@ -399,7 +402,7 @@ public class DataNode extends Configured
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 0));
Configuration sslConf = new HdfsConfiguration(false);
- sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
+ sslConf.addResource(conf.get(DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
"ssl-server.xml"));
this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
if(LOG.isDebugEnabled()) {
@@ -447,10 +450,15 @@ public class DataNode extends Configured
new ClientDatanodeProtocolServerSideTranslatorPB(this);
BlockingService service = ClientDatanodeProtocolService
.newReflectiveBlockingService(clientDatanodeProtocolXlator);
- ipcServer = RPC.getServer(ClientDatanodeProtocolPB.class, service, ipcAddr
- .getHostName(), ipcAddr.getPort(), conf.getInt(
- DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT),
- false, conf, blockPoolTokenSecretManager);
+ ipcServer = new RPC.Builder(conf)
+ .setProtocol(ClientDatanodeProtocolPB.class)
+ .setInstance(service)
+ .setBindAddress(ipcAddr.getHostName())
+ .setPort(ipcAddr.getPort())
+ .setNumHandlers(
+ conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
+ DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
+ .setSecretManager(blockPoolTokenSecretManager).build();
InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =
new InterDatanodeProtocolServerSideTranslatorPB(this);
@@ -739,6 +747,8 @@ public class DataNode extends Configured
registerMXBean();
initDataXceiver(conf);
startInfoServer(conf);
+ pauseMonitor = new JvmPauseMonitor(conf);
+ pauseMonitor.start();
// BlockPoolTokenSecretManager is required to create ipc server.
this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
@@ -977,7 +987,8 @@ public class DataNode extends Configured
* @return BP registration object
* @throws IOException
*/
- DatanodeRegistration getDNRegistrationForBP(String bpid)
+ @VisibleForTesting
+ public DatanodeRegistration getDNRegistrationForBP(String bpid)
throws IOException {
BPOfferService bpos = blockPoolManager.get(bpid);
if(bpos==null || bpos.bpRegistration==null) {
@@ -1221,6 +1232,9 @@ public class DataNode extends Configured
if (ipcServer != null) {
ipcServer.stop();
}
+ if (pauseMonitor != null) {
+ pauseMonitor.stop();
+ }
if (dataXceiverServer != null) {
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
@@ -1511,6 +1525,7 @@ public class DataNode extends Configured
final BlockConstructionStage stage;
final private DatanodeRegistration bpReg;
final String clientname;
+ final CachingStrategy cachingStrategy;
/**
* Connect to the first item in the target list. Pass along the
@@ -1531,6 +1546,8 @@ public class DataNode extends Configured
BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
bpReg = bpos.bpRegistration;
this.clientname = clientname;
+ this.cachingStrategy =
+ new CachingStrategy(true, getDnConf().readaheadLength);
}
/**
@@ -1573,7 +1590,7 @@ public class DataNode extends Configured
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(),
- false, false, true, DataNode.this, null);
+ false, false, true, DataNode.this, null, cachingStrategy);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
//
@@ -1586,7 +1603,7 @@ public class DataNode extends Configured
}
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
- stage, 0, 0, 0, 0, blockSender.getChecksum());
+ stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
// send data & checksum
blockSender.sendBlock(out, unbufOut, null);
@@ -2436,7 +2453,7 @@ public class DataNode extends Configured
return dxcs.balanceThrottler.getBandwidth();
}
- DNConf getDnConf() {
+ public DNConf getDnConf() {
return dnConf;
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Aug 12 21:25:49 2013
@@ -299,7 +299,8 @@ class DataXceiver extends Receiver imple
final String clientName,
final long blockOffset,
final long length,
- final boolean sendChecksum) throws IOException {
+ final boolean sendChecksum,
+ final CachingStrategy cachingStrategy) throws IOException {
previousOpClientName = clientName;
OutputStream baseStream = getOutputStream();
@@ -324,7 +325,8 @@ class DataXceiver extends Receiver imple
try {
try {
blockSender = new BlockSender(block, blockOffset, length,
- true, false, sendChecksum, datanode, clientTraceFmt);
+ true, false, sendChecksum, datanode, clientTraceFmt,
+ cachingStrategy);
} catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg);
@@ -393,7 +395,8 @@ class DataXceiver extends Receiver imple
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
- DataChecksum requestedChecksum) throws IOException {
+ DataChecksum requestedChecksum,
+ CachingStrategy cachingStrategy) throws IOException {
previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block);
final boolean isDatanode = clientname.length() == 0;
@@ -452,7 +455,8 @@ class DataXceiver extends Receiver imple
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
- clientname, srcDataNode, datanode, requestedChecksum);
+ clientname, srcDataNode, datanode, requestedChecksum,
+ cachingStrategy);
} else {
datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
}
@@ -497,7 +501,8 @@ class DataXceiver extends Receiver imple
new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
clientname, targets, srcDataNode, stage, pipelineSize,
- minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum);
+ minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
+ cachingStrategy);
mirrorOut.flush();
@@ -715,7 +720,7 @@ class DataXceiver extends Receiver imple
try {
// check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode,
- null);
+ null, CachingStrategy.newDropBehind());
// set up response stream
OutputStream baseStream = getOutputStream();
@@ -846,7 +851,8 @@ class DataXceiver extends Receiver imple
blockReceiver = new BlockReceiver(
block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
- null, 0, 0, 0, "", null, datanode, remoteChecksum);
+ null, 0, 0, 0, "", null, datanode, remoteChecksum,
+ CachingStrategy.newDropBehind());
// receive a block
blockReceiver.receiveBlock(null, null, null, null,
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Mon Aug 12 21:25:49 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
/**
* This class defines a replica in a pipeline, which
@@ -150,11 +151,16 @@ public class ReplicaInPipeline extends R
* Interrupt the writing thread and wait until it dies
* @throws IOException the waiting is interrupted
*/
- public void stopWriter() throws IOException {
+ public void stopWriter(long xceiverStopTimeout) throws IOException {
if (writer != null && writer != Thread.currentThread() && writer.isAlive()) {
writer.interrupt();
try {
- writer.join();
+ writer.join(xceiverStopTimeout);
+ if (writer.isAlive()) {
+ final String msg = "Join on writer thread " + writer + " timed out";
+ DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(writer));
+ throw new IOException(msg);
+ }
} catch (InterruptedException e) {
throw new IOException("Waiting for writer thread is interrupted.");
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Mon Aug 12 21:25:49 2013
@@ -76,7 +76,6 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.DataChecksum;
@@ -615,7 +614,7 @@ class FsDatasetImpl implements FsDataset
if (replicaInfo.getState() == ReplicaState.RBW) {
ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
// kill the previous writer
- rbw.stopWriter();
+ rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
rbw.setWriter(Thread.currentThread());
// check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
if (replicaLen != rbw.getBytesOnDisk()
@@ -735,7 +734,7 @@ class FsDatasetImpl implements FsDataset
LOG.info("Recovering " + rbw);
// Stop the previous writer
- rbw.stopWriter();
+ rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
rbw.setWriter(Thread.currentThread());
// check generation stamp
@@ -1451,13 +1450,14 @@ class FsDatasetImpl implements FsDataset
@Override // FsDatasetSpi
public synchronized ReplicaRecoveryInfo initReplicaRecovery(
RecoveringBlock rBlock) throws IOException {
- return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(),
- volumeMap, rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp());
+ return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), volumeMap,
+ rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp(),
+ datanode.getDnConf().getXceiverStopTimeout());
}
/** static version of {@link #initReplicaRecovery(Block, long)}. */
- static ReplicaRecoveryInfo initReplicaRecovery(String bpid,
- ReplicaMap map, Block block, long recoveryId) throws IOException {
+ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
+ Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
final ReplicaInfo replica = map.get(bpid, block.getBlockId());
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
+ ", replica=" + replica);
@@ -1470,7 +1470,7 @@ class FsDatasetImpl implements FsDataset
//stop writer if there is any
if (replica instanceof ReplicaInPipeline) {
final ReplicaInPipeline rip = (ReplicaInPipeline)replica;
- rip.stopWriter();
+ rip.stopWriter(xceiverStopTimeout);
//check replica bytes on disk.
if (rip.getBytesOnDisk() < rip.getVisibleLength()) {
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Mon Aug 12 21:25:49 2013
@@ -99,9 +99,19 @@ class FsVolumeList {
}
void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
+ long totalStartTime = System.currentTimeMillis();
for (FsVolumeImpl v : volumes) {
+ FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid +
+ " on volume " + v + "...");
+ long startTime = System.currentTimeMillis();
v.getVolumeMap(bpid, volumeMap);
- }
+ long timeTaken = System.currentTimeMillis() - startTime;
+ FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid +
+ " on volume " + v + ": " + timeTaken + "ms");
+ }
+ long totalTimeTaken = System.currentTimeMillis() - totalStartTime;
+ FsDatasetImpl.LOG.info("Total time to add all replicas to map: "
+ + totalTimeTaken + "ms");
}
/**
@@ -150,10 +160,47 @@ class FsVolumeList {
}
- void addBlockPool(String bpid, Configuration conf) throws IOException {
- for (FsVolumeImpl v : volumes) {
- v.addBlockPool(bpid, conf);
+ void addBlockPool(final String bpid, final Configuration conf) throws IOException {
+ long totalStartTime = System.currentTimeMillis();
+
+ final List<IOException> exceptions = Collections.synchronizedList(
+ new ArrayList<IOException>());
+ List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
+ for (final FsVolumeImpl v : volumes) {
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ FsDatasetImpl.LOG.info("Scanning block pool " + bpid +
+ " on volume " + v + "...");
+ long startTime = System.currentTimeMillis();
+ v.addBlockPool(bpid, conf);
+ long timeTaken = System.currentTimeMillis() - startTime;
+ FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid +
+ " on " + v + ": " + timeTaken + "ms");
+ } catch (IOException ioe) {
+ FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
+ ". Will throw later.", ioe);
+ exceptions.add(ioe);
+ }
+ }
+ };
+ blockPoolAddingThreads.add(t);
+ t.start();
}
+ for (Thread t : blockPoolAddingThreads) {
+ try {
+ t.join();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+ if (!exceptions.isEmpty()) {
+ throw exceptions.get(0);
+ }
+
+ long totalTimeTaken = System.currentTimeMillis() - totalStartTime;
+ FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " +
+ bpid + ": " + totalTimeTaken + "ms");
}
void removeBlockPool(String bpid) {