You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by za...@apache.org on 2022/11/25 01:23:42 UTC
[hadoop] branch trunk updated: HDFS-16826. [RBF SBN] ConnectionManager should advance the client stateId for each request (#5086)
This is an automated email from the ASF dual-hosted git repository.
zanderxu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new e0974298cef HDFS-16826. [RBF SBN] ConnectionManager should advance the client stateId for each request (#5086)
e0974298cef is described below
commit e0974298cef072181ee2b639af8eac46cd712245
Author: ZanderXu <za...@apache.org>
AuthorDate: Fri Nov 25 09:23:33 2022 +0800
HDFS-16826. [RBF SBN] ConnectionManager should advance the client stateId for each request (#5086)
---
.../federation/router/ConnectionManager.java | 5 ++-
.../federation/router/PoolAlignmentContext.java | 7 ++++
.../federation/router/TestConnectionManager.java | 49 ++++++++++++++++++++++
3 files changed, 59 insertions(+), 2 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
index c6db9837c7c..eeaa9cd4b34 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
@@ -226,13 +226,14 @@ public class ConnectionManager {
this.pools.put(connectionId, pool);
this.connectionPoolToNamespaceMap.put(connectionId, nsId);
}
- long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId);
- pool.getPoolAlignmentContext().advanceClientStateId(clientStateId);
} finally {
writeLock.unlock();
}
}
+ long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId);
+ pool.getPoolAlignmentContext().advanceClientStateId(clientStateId);
+
ConnectionContext conn = pool.getConnection();
// Add a new connection to the pool if it wasn't usable
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java
index ccb7e9762af..1f2b12d445f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.util.concurrent.atomic.LongAccumulator;
+
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
@@ -99,4 +101,9 @@ public class PoolAlignmentContext implements AlignmentContext {
public void advanceClientStateId(Long clientStateId) {
poolLocalStateId.accumulate(clientStateId);
}
+
+ @VisibleForTesting
+ public long getPoolLocalStateId() {
+ return this.poolLocalStateId.get();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
index 067d43dabd5..42288bcf53a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
@@ -18,8 +18,11 @@
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
@@ -31,6 +34,7 @@ import org.junit.Rule;
import org.junit.rules.ExpectedException;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -305,6 +309,51 @@ public class TestConnectionManager {
}
}
+ @Test
+ public void testAdvanceClientStateId() throws IOException {
+ // Start one ConnectionManager
+ Configuration tmpConf = new Configuration();
+ ConnectionManager tmpConnManager = new ConnectionManager(tmpConf);
+ tmpConnManager.start();
+ Map<ConnectionPoolId, ConnectionPool> poolMap = tmpConnManager.getPools();
+
+ // Mock one Server.Call with FederatedNamespaceState that ns0 = 1L.
+ Server.Call mockCall1 = new Server.Call(1, 1, null, null,
+ RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3});
+ Map<String, Long> nsStateId = new HashMap<>();
+ nsStateId.put("ns0", 1L);
+ RouterFederatedStateProto.Builder stateBuilder = RouterFederatedStateProto.newBuilder();
+ nsStateId.forEach(stateBuilder::putNamespaceStateIds);
+ mockCall1.setFederatedNamespaceState(stateBuilder.build().toByteString());
+
+ Server.getCurCall().set(mockCall1);
+
+ // Create one new connection pool
+ tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class, "ns0");
+ assertEquals(1, poolMap.size());
+ ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1,
+ TEST_NN_ADDRESS, NamenodeProtocol.class);
+ ConnectionPool pool = poolMap.get(connectionPoolId);
+ assertEquals(1L, pool.getPoolAlignmentContext().getPoolLocalStateId());
+
+ // Mock one Server.Call with FederatedNamespaceState that ns0 = 2L.
+ Server.Call mockCall2 = new Server.Call(2, 1, null, null,
+ RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3});
+ nsStateId.clear();
+ nsStateId.put("ns0", 2L);
+ stateBuilder = RouterFederatedStateProto.newBuilder();
+ nsStateId.forEach(stateBuilder::putNamespaceStateIds);
+ mockCall2.setFederatedNamespaceState(stateBuilder.build().toByteString());
+
+ Server.getCurCall().set(mockCall2);
+
+ // Get one existed connection for ns0
+ tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class, "ns0");
+ assertEquals(1, poolMap.size());
+ pool = poolMap.get(connectionPoolId);
+ assertEquals(2L, pool.getPoolAlignmentContext().getPoolLocalStateId());
+ }
+
@Test
public void testConfigureConnectionActiveRatio() throws IOException {
// test 1 conn below the threshold and these conns are closed
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org