You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2013/09/06 19:24:45 UTC
svn commit: r1520639 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
Author: jing9
Date: Fri Sep 6 17:24:44 2013
New Revision: 1520639
URL: http://svn.apache.org/r1520639
Log:
HDFS-5118. Merge change r1520637 from trunk.
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1520639&r1=1520638&r2=1520639&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Sep 6 17:24:44 2013
@@ -6,6 +6,9 @@ Release 2.3.0 - UNRELEASED
NEW FEATURES
+ HDFS-5118. Provide testing support for DFSClient to drop RPC responses.
+ (jing9)
+
IMPROVEMENTS
HDFS-4657. Limit the number of blocks logged by the NN after a block
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1520639&r1=1520638&r2=1520639&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Sep 6 17:24:44 2013
@@ -27,6 +27,9 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
@@ -44,9 +47,6 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
@@ -102,6 +102,7 @@ import org.apache.hadoop.fs.MD5MD5CRC32G
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -115,13 +116,13 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
@@ -146,6 +147,7 @@ import org.apache.hadoop.io.EnumSetWrita
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -453,7 +455,11 @@ public class DFSClient implements java.i
/**
* Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
- * Exactly one of nameNodeUri or rpcNamenode must be null.
+ * If HA is enabled and a positive value is set for
+ * {@link DFSConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} in the
+ * configuration, the DFSClient will use {@link LossyRetryInvocationHandler}
+ * as its RetryInvocationHandler. Otherwise one of nameNodeUri or rpcNamenode
+ * must be null.
*/
@VisibleForTesting
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
@@ -477,7 +483,20 @@ public class DFSClient implements java.i
this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
- if (rpcNamenode != null) {
+ int numResponseToDrop = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
+ DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
+ if (numResponseToDrop > 0) {
+ // This case is used for testing.
+ LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
+ + " is set to " + numResponseToDrop
+ + ", this hacked client will proactively drop responses");
+ NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies
+ .createProxyWithLossyRetryHandler(conf, nameNodeUri,
+ ClientProtocol.class, numResponseToDrop);
+ this.dtService = proxyInfo.getDelegationTokenService();
+ this.namenode = proxyInfo.getProxy();
+ } else if (rpcNamenode != null) {
// This case is used for testing.
Preconditions.checkArgument(nameNodeUri == null);
this.namenode = rpcNamenode;
@@ -516,7 +535,7 @@ public class DFSClient implements java.i
this.defaultWriteCachingStrategy =
new CachingStrategy(writeDropBehind, readahead);
}
-
+
/**
* Return the socket addresses to use with each configured
* local interface. Local interfaces may be specified by IP
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1520639&r1=1520638&r2=1520639&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Sep 6 17:24:44 2013
@@ -497,6 +497,12 @@ public class DFSConfigKeys extends Commo
public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes
public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent";
public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f;
+
+ // The number of NN response dropped by client proactively in each RPC call.
+ // For testing NN retry cache, we can set this property with positive value.
+ public static final String DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY = "dfs.client.test.drop.namenode.response.number";
+ public static final int DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT = 0;
+
// Hidden configuration undocumented in hdfs-site. xml
// Timeout to wait for block receiver and responder thread to stop
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1520639&r1=1520638&r2=1520639&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Fri Sep 6 17:24:44 2013
@@ -17,10 +17,18 @@
*/
package org.apache.hadoop.hdfs;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
@@ -48,6 +56,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
@@ -144,6 +153,61 @@ public class NameNodeProxies {
return new ProxyAndInfo<T>(proxy, dtService);
}
}
+
+ /**
+ * Generate a dummy namenode proxy instance that utilizes our hacked
+ * {@link LossyRetryInvocationHandler}. Proxy instance generated using this
+ * method will proactively drop RPC responses. Currently this method only
+ * support HA setup. IllegalStateException will be thrown if the given
+ * configuration is not for HA.
+ *
+ * @param config the configuration containing the required IPC
+ * properties, client failover configurations, etc.
+ * @param nameNodeUri the URI pointing either to a specific NameNode
+ * or to a logical nameservice.
+ * @param xface the IPC interface which should be created
+ * @param numResponseToDrop The number of responses to drop for each RPC call
+ * @return an object containing both the proxy and the associated
+ * delegation token service it corresponds to
+ * @throws IOException if there is an error creating the proxy
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
+ Configuration config, URI nameNodeUri, Class<T> xface,
+ int numResponseToDrop) throws IOException {
+ Preconditions.checkArgument(numResponseToDrop > 0);
+ Class<FailoverProxyProvider<T>> failoverProxyProviderClass =
+ getFailoverProxyProviderClass(config, nameNodeUri, xface);
+ if (failoverProxyProviderClass != null) { // HA case
+ FailoverProxyProvider<T> failoverProxyProvider =
+ createFailoverProxyProvider(config, failoverProxyProviderClass,
+ xface, nameNodeUri);
+ int delay = config.getInt(
+ DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
+ DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
+ int maxCap = config.getInt(
+ DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
+ DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
+ int maxFailoverAttempts = config.getInt(
+ DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+ DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
+ InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>(
+ numResponseToDrop, failoverProxyProvider,
+ RetryPolicies.failoverOnNetworkException(
+ RetryPolicies.TRY_ONCE_THEN_FAIL,
+ Math.max(numResponseToDrop + 1, maxFailoverAttempts), delay,
+ maxCap));
+
+ T proxy = (T) Proxy.newProxyInstance(
+ failoverProxyProvider.getInterface().getClassLoader(),
+ new Class[] { xface }, dummyHandler);
+ Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
+ return new ProxyAndInfo<T>(proxy, dtService);
+ } else {
+ throw new IllegalStateException("Currently creating proxy using " +
+ "LossyRetryInvocationHandler requires NN HA setup");
+ }
+ }
/**
* Creates an explicitly non-HA-enabled proxy object. Most of the time you