You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2022/07/14 01:56:50 UTC
[hadoop] branch trunk updated: HDFS-16283. RBF: reducing the load of renewLease() RPC (#4524). Contributed by ZanderXu.
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6f9c4359eca HDFS-16283. RBF: reducing the load of renewLease() RPC (#4524). Contributed by ZanderXu.
6f9c4359eca is described below
commit 6f9c4359ecac89168d8eba35f5f4622807b0376b
Author: xuzq <15...@163.com>
AuthorDate: Thu Jul 14 09:56:40 2022 +0800
HDFS-16283. RBF: reducing the load of renewLease() RPC (#4524). Contributed by ZanderXu.
Reviewed-by: He Xiaoqiao <he...@apache.org>
Signed-off-by: Ayush Saxena <ay...@apache.org>
---
.../java/org/apache/hadoop/hdfs/DFSClient.java | 24 +++++-
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 7 ++
.../hadoop/hdfs/protocol/ClientProtocol.java | 10 ++-
.../hadoop/hdfs/protocol/HdfsFileStatus.java | 4 +
.../hdfs/protocol/HdfsLocatedFileStatus.java | 12 +++
.../hadoop/hdfs/protocol/HdfsNamedFileStatus.java | 11 +++
.../ClientNamenodeProtocolTranslatorPB.java | 12 ++-
.../hadoop/hdfs/protocolPB/PBHelperClient.java | 9 +-
.../src/main/proto/ClientNamenodeProtocol.proto | 1 +
.../hadoop-hdfs-client/src/main/proto/hdfs.proto | 1 +
.../federation/router/RouterClientProtocol.java | 54 ++++++++++--
.../server/federation/router/RouterRpcServer.java | 5 +-
.../fairness/TestRouterHandlersFairness.java | 2 +-
.../metrics/TestRouterClientMetrics.java | 2 +-
.../federation/router/TestDisableNameservices.java | 4 +-
.../router/TestRouterClientRejectOverload.java | 4 +-
.../router/TestRouterRPCClientRetries.java | 2 +-
.../server/federation/router/TestRouterRpc.java | 97 ++++++++++++++++++++++
...ientNamenodeProtocolServerSideTranslatorPB.java | 2 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 9 +-
.../apache/hadoop/hdfs/TestDFSClientRetries.java | 10 +--
.../java/org/apache/hadoop/hdfs/TestLease.java | 5 +-
22 files changed, 255 insertions(+), 32 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 2682549cba1..f314ac9c6e3 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -41,6 +41,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -579,6 +580,27 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
+ /**
+ * Get all namespaces of DFSOutputStreams.
+ */
+ private List<String> getNamespaces() {
+ HashSet<String> namespaces = new HashSet<>();
+ synchronized (filesBeingWritten) {
+ for (DFSOutputStream outputStream : filesBeingWritten.values()) {
+ String namespace = outputStream.getNamespace();
+ if (namespace == null || namespace.isEmpty()) {
+ return null;
+ } else {
+ namespaces.add(namespace);
+ }
+ }
+ if (namespaces.isEmpty()) {
+ return null;
+ }
+ }
+ return new ArrayList<>(namespaces);
+ }
+
/**
* Renew leases.
* @return true if lease was renewed. May return false if this
@@ -587,7 +609,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public boolean renewLease() throws IOException {
if (clientRunning && !isFilesBeingWrittenEmpty()) {
try {
- namenode.renewLease(clientName);
+ namenode.renewLease(clientName, getNamespaces());
updateLastLeaseRenewal();
return true;
} catch (IOException e) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 0deaa41cf41..92df7c51b23 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -113,6 +113,7 @@ public class DFSOutputStream extends FSOutputSummer
protected final String src;
protected final long fileId;
+ private final String namespace;
protected final long blockSize;
protected final int bytesPerChecksum;
@@ -195,6 +196,7 @@ public class DFSOutputStream extends FSOutputSummer
this.dfsClient = dfsClient;
this.src = src;
this.fileId = stat.getFileId();
+ this.namespace = stat.getNamespace();
this.blockSize = stat.getBlockSize();
this.blockReplication = stat.getReplication();
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
@@ -1084,6 +1086,11 @@ public class DFSOutputStream extends FSOutputSummer
return fileId;
}
+ @VisibleForTesting
+ public String getNamespace() {
+ return namespace;
+ }
+
/**
* Return the source of stream.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index e9ae803a541..4f2da496a1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -759,11 +759,19 @@ public interface ClientProtocol {
* the last call to renewLease(), the NameNode assumes the
* client has died.
*
+ * @param namespaces The full Namespace list that the renewLease rpc
+ * should be forwarded by RBF.
+ * Tips: NN side, this value should be null.
+ * RBF side, if this value is null, this rpc will
+ * be forwarded to all available namespaces,
+ * else this rpc will be forwarded to
+ * the special namespaces.
+ *
* @throws org.apache.hadoop.security.AccessControlException permission denied
* @throws IOException If an I/O error occurred
*/
@Idempotent
- void renewLease(String clientName) throws IOException;
+ void renewLease(String clientName, List<String> namespaces) throws IOException;
/**
* Start lease recovery.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
index 264e3f4050f..efc3b90b5a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
@@ -490,6 +490,10 @@ public interface HdfsFileStatus
*/
int compareTo(FileStatus stat);
+ void setNamespace(String namespace);
+
+ String getNamespace();
+
/**
* Set redundant flags for compatibility with existing applications.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
index bf4e0d2f9f1..a3d4867cff4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
@@ -54,6 +54,8 @@ public class HdfsLocatedFileStatus
// BlockLocations[] is the user-facing type
private transient LocatedBlocks hdfsloc;
+ private String namespace = null;
+
/**
* Constructor.
* @param length the number of bytes the file has
@@ -217,4 +219,14 @@ public class HdfsLocatedFileStatus
return this;
}
+ @Override
+ public String getNamespace() {
+ return namespace;
+ }
+
+ @Override
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsNamedFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsNamedFileStatus.java
index 9434423d721..4c90e17e4a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsNamedFileStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsNamedFileStatus.java
@@ -44,6 +44,8 @@ public class HdfsNamedFileStatus extends FileStatus implements HdfsFileStatus {
private final int childrenNum;
private final byte storagePolicy;
+ private String namespace = null;
+
/**
* Constructor.
* @param length the number of bytes the file has
@@ -177,4 +179,13 @@ public class HdfsNamedFileStatus extends FileStatus implements HdfsFileStatus {
return super.hashCode();
}
+ @Override
+ public String getNamespace() {
+ return namespace;
+ }
+
+ @Override
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 7b8ca42c50f..541a4361896 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -744,11 +744,15 @@ public class ClientNamenodeProtocolTranslatorPB implements
@Override
- public void renewLease(String clientName) throws IOException {
- RenewLeaseRequestProto req = RenewLeaseRequestProto.newBuilder()
- .setClientName(clientName).build();
+ public void renewLease(String clientName, List<String> namespaces)
+ throws IOException {
+ RenewLeaseRequestProto.Builder builder = RenewLeaseRequestProto
+ .newBuilder().setClientName(clientName);
+ if (namespaces != null && !namespaces.isEmpty()) {
+ builder.addAllNamespaces(namespaces);
+ }
try {
- rpcProxy.renewLease(null, req);
+ rpcProxy.renewLease(null, builder.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 6097d2f495d..496a5cf4614 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -1764,7 +1764,7 @@ public class PBHelperClient {
EnumSet<HdfsFileStatus.Flags> flags = fs.hasFlags()
? convertFlags(fs.getFlags())
: convertFlags(fs.getPermission());
- return new HdfsFileStatus.Builder()
+ HdfsFileStatus hdfsFileStatus = new HdfsFileStatus.Builder()
.length(fs.getLength())
.isdir(fs.getFileType().equals(FileType.IS_DIR))
.replication(fs.getBlockReplication())
@@ -1794,6 +1794,10 @@ public class PBHelperClient {
? convertErasureCodingPolicy(fs.getEcPolicy())
: null)
.build();
+ if (fs.hasNamespace()) {
+ hdfsFileStatus.setNamespace(fs.getNamespace());
+ }
+ return hdfsFileStatus;
}
private static EnumSet<HdfsFileStatus.Flags> convertFlags(int flags) {
@@ -2399,6 +2403,9 @@ public class PBHelperClient {
flags |= fs.isSnapshotEnabled() ? HdfsFileStatusProto.Flags
.SNAPSHOT_ENABLED_VALUE : 0;
builder.setFlags(flags);
+ if (fs.getNamespace() != null && !fs.getNamespace().isEmpty()) {
+ builder.setNamespace(fs.getNamespace());
+ }
return builder.build();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
index 1e8d0b0a266..60792b5b6c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
@@ -332,6 +332,7 @@ message GetSnapshotDiffReportListingResponseProto {
}
message RenewLeaseRequestProto {
required string clientName = 1;
+ repeated string namespaces = 2;
}
message RenewLeaseResponseProto { //void response
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 163d3a49d30..a4d36180c2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -481,6 +481,7 @@ message HdfsFileStatusProto {
// Set of flags
optional uint32 flags = 18 [default = 0];
+ optional string namespace = 19;
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index c1dafec9220..73445595de7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -291,8 +291,10 @@ public class RouterClientProtocol implements ClientProtocol {
RemoteLocation createLocation = null;
try {
createLocation = rpcServer.getCreateLocation(src, locations);
- return rpcClient.invokeSingle(createLocation, method,
+ HdfsFileStatus status = rpcClient.invokeSingle(createLocation, method,
HdfsFileStatus.class);
+ status.setNamespace(createLocation.getNameserviceId());
+ return status;
} catch (IOException ioe) {
final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
method, src, ioe, createLocation, locations);
@@ -377,8 +379,11 @@ public class RouterClientProtocol implements ClientProtocol {
RemoteMethod method = new RemoteMethod("append",
new Class<?>[] {String.class, String.class, EnumSetWritable.class},
new RemoteParam(), clientName, flag);
- return rpcClient.invokeSequential(
- locations, method, LastBlockWithStatus.class, null);
+ RemoteResult result = rpcClient.invokeSequential(
+ method, locations, LastBlockWithStatus.class, null);
+ LastBlockWithStatus lbws = (LastBlockWithStatus) result.getResult();
+ lbws.getFileStatus().setNamespace(result.getLocation().getNameserviceId());
+ return lbws;
}
@Override
@@ -759,14 +764,49 @@ public class RouterClientProtocol implements ClientProtocol {
}
}
+ private Map<String, FederationNamespaceInfo> getAvailableNamespaces()
+ throws IOException {
+ Map<String, FederationNamespaceInfo> allAvailableNamespaces =
+ new HashMap<>();
+ namenodeResolver.getNamespaces().forEach(
+ k -> allAvailableNamespaces.put(k.getNameserviceId(), k));
+ return allAvailableNamespaces;
+ }
+
+ /**
+ * Try to get a list of FederationNamespaceInfo for renewLease RPC.
+ */
+ private List<FederationNamespaceInfo> getRenewLeaseNSs(List<String> namespaces)
+ throws IOException {
+ if (namespaces == null || namespaces.isEmpty()) {
+ return new ArrayList<>(namenodeResolver.getNamespaces());
+ }
+ List<FederationNamespaceInfo> result = new ArrayList<>();
+ Map<String, FederationNamespaceInfo> allAvailableNamespaces =
+ getAvailableNamespaces();
+ for (String namespace : namespaces) {
+ if (!allAvailableNamespaces.containsKey(namespace)) {
+ return new ArrayList<>(namenodeResolver.getNamespaces());
+ } else {
+ result.add(allAvailableNamespaces.get(namespace));
+ }
+ }
+ return result;
+ }
+
@Override
- public void renewLease(String clientName) throws IOException {
+ public void renewLease(String clientName, List<String> namespaces)
+ throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
RemoteMethod method = new RemoteMethod("renewLease",
- new Class<?>[] {String.class}, clientName);
- Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
- rpcClient.invokeConcurrent(nss, method, false, false);
+ new Class<?>[] {String.class, List.class}, clientName, null);
+ List<FederationNamespaceInfo> nss = getRenewLeaseNSs(namespaces);
+ if (nss.size() == 1) {
+ rpcClient.invokeSingle(nss.get(0).getNameserviceId(), method);
+ } else {
+ rpcClient.invokeConcurrent(nss, method, false, false);
+ }
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 58181dcc346..980d64a45d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -980,8 +980,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
}
@Override // ClientProtocol
- public void renewLease(String clientName) throws IOException {
- clientProto.renewLease(clientName);
+ public void renewLease(String clientName, List<String> namespaces)
+ throws IOException {
+ clientProto.renewLease(clientName, namespaces);
}
@Override // ClientProtocol
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java
index 2d27c66e37e..8fc9de0cb26 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java
@@ -194,7 +194,7 @@ public class TestRouterHandlersFairness {
private void invokeConcurrent(ClientProtocol routerProto, String clientName)
throws IOException {
- routerProto.renewLease(clientName);
+ routerProto.renewLease(clientName, null);
}
private int getTotalRejectedPermits(RouterContext routerContext) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestRouterClientMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestRouterClientMetrics.java
index da16c059107..3397718745f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestRouterClientMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestRouterClientMetrics.java
@@ -156,7 +156,7 @@ public class TestRouterClientMetrics {
@Test
public void testRenewLease() throws Exception {
- router.getRpcServer().renewLease("test");
+ router.getRpcServer().renewLease("test", null);
assertCounter("RenewLeaseOps", 2L, getMetrics(ROUTER_METRICS));
assertCounter("ConcurrentRenewLeaseOps", 1L, getMetrics(ROUTER_METRICS));
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableNameservices.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableNameservices.java
index ae04150d70f..78f41c5d92a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableNameservices.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableNameservices.java
@@ -159,7 +159,7 @@ public class TestDisableNameservices {
public void testWithoutDisabling() throws IOException {
// ns0 is slow and renewLease should take a long time
long t0 = monotonicNow();
- routerProtocol.renewLease("client0");
+ routerProtocol.renewLease("client0", null);
long t = monotonicNow() - t0;
assertTrue("It took too little: " + t + "ms",
t > TimeUnit.SECONDS.toMillis(1));
@@ -178,7 +178,7 @@ public class TestDisableNameservices {
// renewLease should be fast as we are skipping ns0
long t0 = monotonicNow();
- routerProtocol.renewLease("client0");
+ routerProtocol.renewLease("client0", null);
long t = monotonicNow() - t0;
assertTrue("It took too long: " + t + "ms",
t < TimeUnit.SECONDS.toMillis(1));
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java
index 71ec747af4c..04cfb5c9d90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java
@@ -215,7 +215,7 @@ public class TestRouterClientRejectOverload {
routerClient = new DFSClient(address, conf);
String clientName = routerClient.getClientName();
ClientProtocol routerProto = routerClient.getNamenode();
- routerProto.renewLease(clientName);
+ routerProto.renewLease(clientName, null);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException();
assertTrue("Wrong exception: " + ioe,
@@ -390,7 +390,7 @@ public class TestRouterClientRejectOverload {
cluster.getRouterClientConf());
String clientName = routerClient.getClientName();
ClientProtocol routerProto = routerClient.getNamenode();
- routerProto.renewLease(clientName);
+ routerProto.renewLease(clientName, null);
} catch (Exception e) {
fail("Client request failed: " + e);
} finally {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
index 73803d98052..039acbb5988 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
@@ -153,7 +153,7 @@ public class TestRouterRPCClientRetries {
DFSClient client = nnContext1.getClient();
// Renew lease for the DFS client, it will succeed.
- routerProtocol.renewLease(client.getClientName());
+ routerProtocol.renewLease(client.getClientName(), null);
// Verify the retry times, it will retry one time for ns0.
FederationRPCMetrics rpcMetrics = routerContext.getRouter()
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index 4aeb2ec9b8f..f71145a4522 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -107,6 +108,7 @@ import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeCon
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@@ -1450,6 +1452,101 @@ public class TestRouterRpc {
assertEquals(nnSuccess, routerSuccess);
}
+ private void testRenewLeaseInternal(DistributedFileSystem dfs,
+ FederationRPCMetrics rpcMetrics, Path testPath, boolean createFlag)
+ throws Exception {
+ FSDataOutputStream outputStream = null;
+ try {
+ if (createFlag) {
+ outputStream = dfs.create(testPath);
+ } else {
+ outputStream = dfs.append(testPath);
+ }
+ outputStream.write("hello world. \n".getBytes());
+ long proxyOpBeforeRenewLease = rpcMetrics.getProxyOps();
+ assertTrue(dfs.getClient().renewLease());
+ long proxyOpAfterRenewLease = rpcMetrics.getProxyOps();
+ assertEquals((proxyOpBeforeRenewLease + 1), proxyOpAfterRenewLease);
+ } finally {
+ if (outputStream != null) {
+ outputStream.close();
+ }
+ }
+ }
+
+ @Test
+ public void testRenewLeaseForECFile() throws Exception {
+ String ecName = "RS-6-3-1024k";
+ FederationRPCMetrics metrics = router.getRouterRpcServer().getRPCMetrics();
+ // Install a mount point to a different path to check
+ MockResolver resolver =
+ (MockResolver)router.getRouter().getSubclusterResolver();
+ String ns0 = cluster.getNameservices().get(0);
+ resolver.addLocation("/testRenewLease0", ns0, "/testRenewLease0");
+
+ // Stop LeaseRenewer
+ DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;
+ routerDFS.getClient().getLeaseRenewer().interruptAndJoin();
+
+ Path testECPath = new Path("/testRenewLease0/ecDirectory/test_ec.txt");
+ routerDFS.mkdirs(testECPath.getParent());
+ routerDFS.setErasureCodingPolicy(
+ testECPath.getParent(), ecName);
+ testRenewLeaseInternal(routerDFS, metrics, testECPath, true);
+
+ ErasureCodingPolicy ecPolicy = routerDFS.getErasureCodingPolicy(testECPath);
+ assertNotNull(ecPolicy);
+ assertEquals(ecName, ecPolicy.getName());
+ }
+
+
+ @Test
+ public void testRenewLeaseForReplicaFile() throws Exception {
+ FederationRPCMetrics metrics = router.getRouterRpcServer().getRPCMetrics();
+ // Install a mount point to a different path to check
+ MockResolver resolver =
+ (MockResolver)router.getRouter().getSubclusterResolver();
+ String ns0 = cluster.getNameservices().get(0);
+ resolver.addLocation("/testRenewLease0", ns0, "/testRenewLease0");
+
+ // Stop LeaseRenewer
+ DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;
+ routerDFS.getClient().getLeaseRenewer().interruptAndJoin();
+
+ // Test Replica File
+ Path testPath = new Path("/testRenewLease0/test_replica.txt");
+ testRenewLeaseInternal(routerDFS, metrics, testPath, true);
+ testRenewLeaseInternal(routerDFS, metrics, testPath, false);
+ }
+
+ @Test
+ public void testRenewLeaseWithMultiStream() throws Exception {
+ FederationRPCMetrics metrics = router.getRouterRpcServer().getRPCMetrics();
+ // Install a mount point to a different path to check
+ MockResolver resolver =
+ (MockResolver)router.getRouter().getSubclusterResolver();
+ String ns0 = cluster.getNameservices().get(0);
+ String ns1 = cluster.getNameservices().get(1);
+ resolver.addLocation("/testRenewLease0", ns0, "/testRenewLease0");
+ resolver.addLocation("/testRenewLease1", ns1, "/testRenewLease1");
+
+ // Stop LeaseRenewer
+ DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;
+ routerDFS.getClient().getLeaseRenewer().interruptAndJoin();
+
+ Path newTestPath0 = new Path("/testRenewLease0/test1.txt");
+ Path newTestPath1 = new Path("/testRenewLease1/test1.txt");
+ try (FSDataOutputStream outStream1 = routerDFS.create(newTestPath0);
+ FSDataOutputStream outStream2 = routerDFS.create(newTestPath1)) {
+ outStream1.write("hello world \n".getBytes());
+ outStream2.write("hello world \n".getBytes());
+ long proxyOpBeforeRenewLease2 = metrics.getProxyOps();
+ assertTrue(routerDFS.getClient().renewLease());
+ long proxyOpAfterRenewLease2 = metrics.getProxyOps();
+ assertEquals((proxyOpBeforeRenewLease2 + 2), proxyOpAfterRenewLease2);
+ }
+ }
+
@Test
public void testProxyExceptionMessages() throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index 0164f25460d..79c122cf5ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -818,7 +818,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public RenewLeaseResponseProto renewLease(RpcController controller,
RenewLeaseRequestProto req) throws ServiceException {
try {
- server.renewLease(req.getClientName());
+ server.renewLease(req.getClientName(), req.getNamespacesList());
return VOID_RENEWLEASE_RESPONSE;
} catch (IOException e) {
throw new ServiceException(e);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index c86211b058b..1d50bc5cb53 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1174,7 +1174,14 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // ClientProtocol
- public void renewLease(String clientName) throws IOException {
+ public void renewLease(String clientName, List<String> namespaces)
+ throws IOException {
+ if (namespaces != null && namespaces.size() > 0) {
+ LOG.warn("namespaces({}) should be null or empty "
+ + "on NameNode side, please check it.", namespaces);
+ throw new IOException("namespaces(" + namespaces
+ + ") should be null or empty");
+ }
checkNNStartup();
namesystem.renewLease(clientName);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
index 970003b0e58..c335d38f733 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
@@ -384,7 +384,7 @@ public class TestDFSClientRetries {
cluster.waitActive();
NamenodeProtocols spyNN = spy(cluster.getNameNodeRpc());
Mockito.doThrow(new SocketTimeoutException()).when(spyNN).renewLease(
- Mockito.anyString());
+ Mockito.anyString(), any());
DFSClient client = new DFSClient(null, spyNN, conf, null);
// Get hold of the lease renewer instance used by the client
final LeaseRenewer leaseRenewer1 = client.getLeaseRenewer();
@@ -392,7 +392,7 @@ public class TestDFSClientRetries {
OutputStream out1 = client.create(file1, false);
Mockito.verify(spyNN, timeout(10000).times(1)).renewLease(
- Mockito.anyString());
+ Mockito.anyString(), any());
verifyEmptyLease(leaseRenewer1);
GenericTestUtils.waitFor(() -> !(leaseRenewer1.isRunning()), 100, 10000);
try {
@@ -406,12 +406,12 @@ public class TestDFSClientRetries {
// Verify DFSClient can do write operation after renewLease no longer
// throws SocketTimeoutException.
Mockito.doNothing().when(spyNN).renewLease(
- Mockito.anyString());
+ Mockito.anyString(), any());
final LeaseRenewer leaseRenewer2 = client.getLeaseRenewer();
leaseRenewer2.setRenewalTime(100);
OutputStream out2 = client.create(file2, false);
Mockito.verify(spyNN, timeout(10000).times(2)).renewLease(
- Mockito.anyString());
+ Mockito.anyString(), any());
out2.write(new byte[256]);
out2.close();
verifyEmptyLease(leaseRenewer2);
@@ -1309,7 +1309,7 @@ public class TestDFSClientRetries {
try {
//1. trigger get LeaseRenewer lock
Mockito.doThrow(new SocketTimeoutException()).when(spyNN)
- .renewLease(Mockito.anyString());
+ .renewLease(Mockito.anyString(), any());
} catch (IOException e) {
e.printStackTrace();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
index 5d7b62a4284..8b527d07a29 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
@@ -90,7 +90,8 @@ public class TestLease {
// stub the renew method.
doThrow(new RemoteException(InvalidToken.class.getName(),
- "Your token is worthless")).when(spyNN).renewLease(anyString());
+ "Your token is worthless")).when(spyNN).renewLease(
+ anyString(), any());
// We don't need to wait the lease renewer thread to act.
// call renewLease() manually.
@@ -131,7 +132,7 @@ public class TestLease {
Assert.assertTrue(originalRenewer.isEmpty());
// unstub
- doNothing().when(spyNN).renewLease(anyString());
+ doNothing().when(spyNN).renewLease(anyString(), any());
// existing input streams should work
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org