You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/27 09:11:55 UTC
[incubator-ratis] branch master updated: RATIS-1269. Add a new
AdminApi for RaftClient. (#379)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 379122a RATIS-1269. Add a new AdminApi for RaftClient. (#379)
379122a is described below
commit 379122af36caa059295eef4ac693c68e8b74a397
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sun Dec 27 17:11:48 2020 +0800
RATIS-1269. Add a new AdminApi for RaftClient. (#379)
---
.../java/org/apache/ratis/client/RaftClient.java | 17 +++----
.../java/org/apache/ratis/client/api/AdminApi.java | 43 ++++++++++++++++
.../org/apache/ratis/client/impl/AdminImpl.java | 57 ++++++++++++++++++++++
.../apache/ratis/client/impl/RaftClientImpl.java | 52 +++++++-------------
.../org/apache/ratis/RaftExceptionBaseTest.java | 4 +-
.../ratis/server/impl/GroupManagementBaseTest.java | 5 +-
.../ratis/server/impl/LeaderElectionTests.java | 8 +--
.../apache/ratis/server/impl/MiniRaftCluster.java | 2 +-
.../server/impl/RaftReconfigurationBaseTest.java | 15 +++---
9 files changed, 140 insertions(+), 63 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 78e0b77..62d62ac 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -18,6 +18,7 @@
package org.apache.ratis.client;
import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.client.api.AdminApi;
import org.apache.ratis.client.api.AsyncApi;
import org.apache.ratis.client.api.BlockingApi;
import org.apache.ratis.client.api.DataStreamApi;
@@ -34,7 +35,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
-import java.io.IOException;
import java.util.Objects;
/** A client who sends requests to a raft service. */
@@ -50,27 +50,24 @@ public interface RaftClient extends Closeable {
/** @return the {@link RaftClientRpc}. */
RaftClientRpc getClientRpc();
+ /** @return the {@link AdminApi}. */
+ AdminApi admin();
+
/** Get the {@link GroupManagementApi} for the given server. */
GroupManagementApi getGroupManagementApi(RaftPeerId server);
+ /** @return the {@link BlockingApi}. */
+ BlockingApi io();
+
/** Get the {@link AsyncApi}. */
AsyncApi async();
/** @return the {@link MessageStreamApi}. */
MessageStreamApi getMessageStreamApi();
- /** @return the {@link BlockingApi}. */
- BlockingApi io();
-
/** @return the {@link DataStreamApi}. */
DataStreamApi getDataStreamApi();
- /** Send set configuration request to the raft service. */
- RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException;
-
- /** Transfer leadership to the given server.*/
- RaftClientReply transferLeadership(RaftGroupId group, RaftPeerId newLeader, long timeoutMs) throws IOException;
-
/** @return a {@link Builder}. */
static Builder newBuilder() {
return new Builder();
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java
new file mode 100644
index 0000000..175fd95
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.api;
+
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * An API to support administration
+ * such as setting raft configuration and transferring leadership.
+ */
+public interface AdminApi {
+ /** Set the configuration request to the raft service. */
+ RaftClientReply setConfiguration(List<RaftPeer> serversInNewConf) throws IOException;
+
+ /** The same as setConfiguration(Arrays.asList(serversInNewConf)). */
+ default RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException {
+ return setConfiguration(Arrays.asList(serversInNewConf));
+ }
+
+ /** Transfer leadership to the given server.*/
+ RaftClientReply transferLeadership(RaftPeerId newLeader, long timeoutMs) throws IOException;
+}
\ No newline at end of file
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java
new file mode 100644
index 0000000..b2160b5
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import org.apache.ratis.client.api.AdminApi;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
+import org.apache.ratis.rpc.CallId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+class AdminImpl implements AdminApi {
+ private final RaftClientImpl client;
+
+ AdminImpl(RaftClientImpl client) {
+ this.client = Objects.requireNonNull(client, "client == null");
+ }
+
+ @Override
+ public RaftClientReply setConfiguration(List<RaftPeer> peersInNewConf) throws IOException {
+ Objects.requireNonNull(peersInNewConf, "peersInNewConf == null");
+
+ final long callId = CallId.getAndIncrement();
+ // also refresh the rpc proxies for these peers
+ client.getClientRpc().addRaftPeers(peersInNewConf);
+ return client.io().sendRequestWithRetry(() -> new SetConfigurationRequest(
+ client.getId(), client.getLeaderId(), client.getGroupId(), callId, peersInNewConf));
+ }
+
+ @Override
+ public RaftClientReply transferLeadership(RaftPeerId newLeader, long timeoutMs) throws IOException {
+ Objects.requireNonNull(newLeader, "newLeader == null");
+ final long callId = CallId.getAndIncrement();
+ return client.io().sendRequestWithRetry(() -> new TransferLeadershipRequest(
+ client.getId(), client.getLeaderId(), client.getGroupId(), callId, newLeader, timeoutMs));
+ }
+}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 25a1586..950da1f 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -21,8 +21,6 @@ import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.client.api.DataStreamApi;
-import org.apache.ratis.client.api.GroupManagementApi;
-import org.apache.ratis.client.api.MessageStreamApi;
import org.apache.ratis.client.retry.ClientRetryEvent;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
@@ -34,15 +32,12 @@ import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.SetConfigurationRequest;
-import org.apache.ratis.protocol.TransferLeadershipRequest;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.CallId;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
@@ -50,7 +45,6 @@ import org.apache.ratis.util.TimeoutScheduler;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -60,6 +54,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -130,11 +125,14 @@ public final class RaftClientImpl implements RaftClient {
private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
private final Supplier<OrderedAsync> orderedAsync;
- private final Supplier<MessageStreamApi> streamApi;
private final Supplier<AsyncImpl> asyncApi;
private final Supplier<BlockingImpl> blockingApi;
+ private final Supplier<MessageStreamImpl> messageStreamApi;
private final Supplier<DataStreamApi> dataStreamApi;
+ private final Supplier<AdminImpl> adminApi;
+ private final ConcurrentMap<RaftPeerId, GroupManagementImpl> groupManagmenets = new ConcurrentHashMap<>();
+
RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, RaftPeer primaryDataStreamServer,
RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy retryPolicy) {
this.clientId = clientId;
@@ -147,7 +145,7 @@ public final class RaftClientImpl implements RaftClient {
this.clientRpc = clientRpc;
this.orderedAsync = JavaUtils.memoize(() -> OrderedAsync.newInstance(this, properties));
- this.streamApi = JavaUtils.memoize(() -> MessageStreamImpl.newInstance(this, properties));
+ this.messageStreamApi = JavaUtils.memoize(() -> MessageStreamImpl.newInstance(this, properties));
this.asyncApi = JavaUtils.memoize(() -> new AsyncImpl(this));
this.blockingApi = JavaUtils.memoize(() -> new BlockingImpl(this));
this.dataStreamApi = JavaUtils.memoize(() -> DataStreamClient.newBuilder()
@@ -156,6 +154,7 @@ public final class RaftClientImpl implements RaftClient {
.setDataStreamServer(primaryDataStreamServer)
.setProperties(properties)
.build());
+ this.adminApi = JavaUtils.memoize(() -> new AdminImpl(this));
}
public RaftPeerId getLeaderId() {
@@ -201,11 +200,6 @@ public final class RaftClientImpl implements RaftClient {
return orderedAsync.get();
}
- @Override
- public MessageStreamApi getMessageStreamApi() {
- return streamApi.get();
- }
-
RaftClientRequest newRaftClientRequest(
RaftPeerId server, long callId, Message message, RaftClientRequest.Type type,
SlidingWindowEntry slidingWindowEntry) {
@@ -221,40 +215,28 @@ public final class RaftClientImpl implements RaftClient {
}
@Override
- public RaftClientReply transferLeadership(RaftGroupId raftGroupId, RaftPeerId newLeader, long timeoutMs)
- throws IOException {
- Objects.requireNonNull(newLeader, "newLeader == null");
- final long callId = CallId.getAndIncrement();
- return io().sendRequestWithRetry(() -> new TransferLeadershipRequest(
- clientId, leaderId, groupId, callId, newLeader, timeoutMs));
+ public AdminImpl admin() {
+ return adminApi.get();
}
- // TODO: change peersInNewConf to List<RaftPeer>
@Override
- public RaftClientReply setConfiguration(RaftPeer[] peersInNewConf)
- throws IOException {
- Objects.requireNonNull(peersInNewConf, "peersInNewConf == null");
-
- final long callId = CallId.getAndIncrement();
- // also refresh the rpc proxies for these peers
- clientRpc.addRaftPeers(peersInNewConf);
- return io().sendRequestWithRetry(() -> new SetConfigurationRequest(
- clientId, leaderId, groupId, callId, Arrays.asList(peersInNewConf)));
+ public GroupManagementImpl getGroupManagementApi(RaftPeerId server) {
+ return groupManagmenets.computeIfAbsent(server, id -> new GroupManagementImpl(id, this));
}
@Override
- public AsyncImpl async() {
- return asyncApi.get();
+ public BlockingImpl io() {
+ return blockingApi.get();
}
@Override
- public GroupManagementApi getGroupManagementApi(RaftPeerId server) {
- return new GroupManagementImpl(server, this);
+ public AsyncImpl async() {
+ return asyncApi.get();
}
@Override
- public BlockingImpl io() {
- return blockingApi.get();
+ public MessageStreamImpl getMessageStreamApi() {
+ return messageStreamApi.get();
}
@Override
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 864dd47..5a51997 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -113,7 +113,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
// trigger setConfiguration
LOG.info("Start changing the configuration: {}", Arrays.asList(change.allPeersInNewConf));
try (final RaftClient c2 = cluster.createClient(newLeader)) {
- RaftClientReply reply = c2.setConfiguration(change.allPeersInNewConf);
+ RaftClientReply reply = c2.admin().setConfiguration(change.allPeersInNewConf);
Assert.assertTrue(reply.isSuccess());
}
LOG.info(cluster.printServers());
@@ -158,7 +158,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
GroupMismatchException.class);
testFailureCase("setConfiguration(..) with client group being different from the server group",
- () -> client.setConfiguration(RaftPeer.emptyArray()),
+ () -> client.admin().setConfiguration(RaftPeer.emptyArray()),
GroupMismatchException.class);
testFailureCase("groupRemove(..) with another group id",
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index 416c17a..7fcd6fc 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -153,7 +152,7 @@ public abstract class GroupManagementBaseTest extends BaseTest {
final int newSuggestedLeaderIndex = (suggestedLeaderIndex + 1) % peersWithPriority.size();
List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, peers.get(newSuggestedLeaderIndex));
try (final RaftClient client = cluster.createClient(newGroup)) {
- RaftClientReply reply = client.setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
+ RaftClientReply reply = client.admin().setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
Assert.assertTrue(reply.isSuccess());
}
@@ -326,7 +325,7 @@ public abstract class GroupManagementBaseTest extends BaseTest {
}
LOG.info(chosen + ") setConfiguration: " + cluster.printServers(groups[chosen].getGroupId()));
try (final RaftClient client = cluster.createClient(groups[chosen])) {
- client.setConfiguration(allPeers.toArray(RaftPeer.emptyArray()));
+ client.admin().setConfiguration(allPeers.toArray(RaftPeer.emptyArray()));
}
Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster));
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 4a45bbe..2605b29 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -136,10 +136,10 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
List<RaftPeer> peers = cluster.getPeers();
List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, newLeader.getPeer());
- RaftClientReply reply = client.setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
+ RaftClientReply reply = client.admin().setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
Assert.assertTrue(reply.isSuccess());
- reply = client.transferLeadership(leader.getGroup().getGroupId(), newLeader.getId(), 20000);
+ reply = client.admin().transferLeadership(newLeader.getId(), 20000);
assertTrue(reply.isSuccess());
final RaftServer.Division currLeader = waitForLeader(cluster);
@@ -170,7 +170,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
List<RaftPeer> peers = cluster.getPeers();
List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, newLeader.getPeer());
- RaftClientReply reply = client.setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
+ RaftClientReply reply = client.admin().setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
Assert.assertTrue(reply.isSuccess());
CompletableFuture<Boolean> transferTimeoutFuture = CompletableFuture.supplyAsync(() -> {
@@ -178,7 +178,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
long timeoutMs = 5000;
long start = System.currentTimeMillis();
try {
- client.transferLeadership(leader.getGroup().getGroupId(), newLeader.getId(), timeoutMs);
+ client.admin().transferLeadership(newLeader.getId(), timeoutMs);
} catch (TransferLeadershipException e) {
long cost = System.currentTimeMillis() - start;
Assert.assertTrue(cost > timeoutMs);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 9e5ed65..ed4c1a0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -722,7 +722,7 @@ public abstract class MiniRaftCluster implements Closeable {
public void setConfiguration(RaftPeer... peers) throws IOException {
try(RaftClient client = createClient()) {
LOG.info("Start changing the configuration: {}", Arrays.asList(peers));
- final RaftClientReply reply = client.setConfiguration(peers);
+ final RaftClientReply reply = client.admin().setConfiguration(peers);
Preconditions.assertTrue(reply.isSuccess());
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 6bd11f7..53231b2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -219,7 +219,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
LOG.info("Start changing the configuration: {}",
asList(c1.allPeersInNewConf));
- RaftClientReply reply = client.setConfiguration(c1.allPeersInNewConf);
+ RaftClientReply reply = client.admin().setConfiguration(c1.allPeersInNewConf);
reconf1.set(reply.isSuccess());
PeerChanges c2 = cluster.removePeers(2, true, asList(c1.newPeers));
@@ -228,7 +228,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
LOG.info("Start changing the configuration again: {}",
asList(c2.allPeersInNewConf));
- reply = client.setConfiguration(c2.allPeersInNewConf);
+ reply = client.admin().setConfiguration(c2.allPeersInNewConf);
reconf2.set(reply.isSuccess());
latch.countDown();
@@ -301,7 +301,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
for (RaftPeer np : c1.newPeers) {
cluster.restartServer(np.getId(), false);
}
- Assert.assertTrue(client.setConfiguration(c1.allPeersInNewConf).isSuccess());
+ Assert.assertTrue(client.admin().setConfiguration(c1.allPeersInNewConf).isSuccess());
}
}
@@ -343,7 +343,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
Thread clientThread = new Thread(() -> {
try {
- RaftClientReply reply = client.setConfiguration(c1.allPeersInNewConf);
+ RaftClientReply reply = client.admin().setConfiguration(c1.allPeersInNewConf);
success.set(reply.isSuccess());
} catch (IOException ioe) {
LOG.error("FAILED", ioe);
@@ -398,7 +398,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
clientThread = new Thread(() -> {
try(final RaftClient client = cluster.createClient(leaderId)) {
for(int i = 0; clientRunning.get() && !setConf.isDone(); i++) {
- final RaftClientReply reply = client.setConfiguration(c2.allPeersInNewConf);
+ final RaftClientReply reply = client.admin().setConfiguration(c2.allPeersInNewConf);
if (reply.isSuccess()) {
setConf.complete(null);
}
@@ -478,7 +478,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
final RaftConfiguration confBefore = cluster.getLeader().getRaftConf();
// no real configuration change in the request
- final RaftClientReply reply = client.setConfiguration(cluster.getPeers().toArray(RaftPeer.emptyArray()));
+ final RaftClientReply reply = client.admin().setConfiguration(cluster.getPeers().toArray(RaftPeer.emptyArray()));
Assert.assertTrue(reply.isSuccess());
final long newCommittedIndex = leaderLog.getLastCommittedIndex();
for(long i = committedIndex + 1; i <= newCommittedIndex; i++) {
@@ -486,7 +486,6 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
Assert.assertTrue(e.hasMetadataEntry());
}
Assert.assertSame(confBefore, cluster.getLeader().getRaftConf());
- client.close();
}
}
@@ -545,7 +544,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
new Thread(() -> {
try(final RaftClient client1 = cluster.createClient(leaderId)) {
LOG.info("client1 starts to change conf");
- confChanged.set(client1.setConfiguration(newPeers).isSuccess());
+ confChanged.set(client1.admin().setConfiguration(newPeers).isSuccess());
} catch (IOException e) {
LOG.warn("Got unexpected exception when client1 changes conf", e);
}