You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/01/05 02:26:14 UTC
[iotdb] branch expr updated: add async interface in LogDispatcher fix tests
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr by this push:
new 8d02321 add async interface in LogDispatcher fix tests
8d02321 is described below
commit 8d02321cefd293730aca8440927ecbb694867695
Author: jt <jt...@163.com>
AuthorDate: Wed Jan 5 10:25:32 2022 +0800
add async interface in LogDispatcher
fix tests
---
.../java/org/apache/iotdb/cluster/log/Log.java | 4 +-
.../apache/iotdb/cluster/log/LogDispatcher.java | 59 +++++--
.../org/apache/iotdb/cluster/log/VotingLog.java | 6 +
.../log/sequencing/SynchronousSequencer.java | 2 -
.../iotdb/cluster/server/StoppedMemberManager.java | 6 +-
.../handlers/caller/AppendGroupEntryHandler.java | 162 -----------------
.../handlers/caller/AppendNodeEntryHandler.java | 28 +--
.../cluster/server/member/DataGroupMember.java | 7 +-
.../iotdb/cluster/server/member/RaftMember.java | 17 +-
.../cluster/server/service/DataGroupEngine.java | 12 +-
.../cluster/client/ClientPoolFactoryTest.java | 2 +-
.../iotdb/cluster/common/TestMetaGroupMember.java | 19 +-
.../iotdb/cluster/log/LogDispatcherTest.java | 36 ++++
.../caller/AppendGroupEntryHandlerTest.java | 196 ---------------------
.../caller/AppendNodeEntryHandlerTest.java | 35 ++--
.../cluster/server/member/DataGroupMemberTest.java | 3 +-
.../cluster/server/member/MetaGroupMemberTest.java | 5 +-
17 files changed, 168 insertions(+), 431 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 59dd534..015b792 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -34,8 +34,8 @@ public abstract class Log implements Comparable<Log> {
// make this configurable or adaptive
private static final int DEFAULT_BUFFER_SIZE = 16 * 1024;
- private long currLogIndex;
- private long currLogTerm;
+ private long currLogIndex = -1;
+ private long currLogTerm = -1;
// for async application
private volatile boolean applied;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index ffcdb8f..2626d69 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -246,7 +246,8 @@ public class LogDispatcher {
private BlockingQueue<SendLogRequest> logBlockingDeque;
private List<SendLogRequest> currBatch = new ArrayList<>();
private Peer peer;
- Client client;
+ Client syncClient;
+ AsyncClient asyncClient;
DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
this.receiver = receiver;
@@ -255,7 +256,9 @@ public class LogDispatcher {
member
.getPeerMap()
.computeIfAbsent(receiver, r -> new Peer(member.getLogManager().getLastLogIndex()));
- client = member.getSyncClient(receiver);
+ if (!clusterConfig.isUseAsyncServer()) {
+ syncClient = member.getSyncClient(receiver);
+ }
}
@Override
@@ -320,19 +323,19 @@ public class LogDispatcher {
}
Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
- if (client == null) {
- client = member.getSyncClient(receiver);
+ if (syncClient == null) {
+ syncClient = member.getSyncClient(receiver);
}
AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
try {
- AppendEntryResult result = client.appendEntries(request);
+ AppendEntryResult result = syncClient.appendEntries(request);
Timer.Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
handler.onComplete(result);
} catch (TException e) {
- client.getInputProtocol().getTransport().close();
- ClientUtils.putBackSyncClient(client);
- client = member.getSyncClient(receiver);
+ syncClient.getInputProtocol().getTransport().close();
+ ClientUtils.putBackSyncClient(syncClient);
+ syncClient = member.getSyncClient(receiver);
logger.warn("Failed logs: {}, first index: {}", logList, request.prevLogIndex + 1);
handler.onError(e);
}
@@ -411,7 +414,7 @@ public class LogDispatcher {
}
}
- void sendLog(SendLogRequest logRequest) {
+ void sendLogSync(SendLogRequest logRequest) {
AppendNodeEntryHandler handler =
member.getAppendNodeEntryHandler(
logRequest.getVotingLog(),
@@ -420,12 +423,12 @@ public class LogDispatcher {
logRequest.newLeaderTerm,
peer,
logRequest.quorumSize);
- // TODO add async interface
+
int retries = 5;
try {
long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
for (int i = 0; i < retries; i++) {
- AppendEntryResult result = client.appendEntry(logRequest.appendEntryRequest);
+ AppendEntryResult result = syncClient.appendEntry(logRequest.appendEntryRequest);
if (result.status == Response.RESPONSE_OUT_OF_WINDOW) {
Thread.sleep(100);
} else {
@@ -435,15 +438,43 @@ public class LogDispatcher {
}
}
} catch (TException e) {
- client.getInputProtocol().getTransport().close();
- ClientUtils.putBackSyncClient(client);
- client = member.getSyncClient(receiver);
+ syncClient.getInputProtocol().getTransport().close();
+ ClientUtils.putBackSyncClient(syncClient);
+ syncClient = member.getSyncClient(receiver);
handler.onError(e);
} catch (Exception e) {
handler.onError(e);
}
}
+ private void sendLogAsync(SendLogRequest logRequest) {
+ AppendNodeEntryHandler handler =
+ member.getAppendNodeEntryHandler(
+ logRequest.getVotingLog(),
+ receiver,
+ logRequest.leaderShipStale,
+ logRequest.newLeaderTerm,
+ peer,
+ logRequest.quorumSize);
+
+ AsyncClient client = member.getAsyncClient(receiver);
+ if (client != null) {
+ try {
+ client.appendEntry(logRequest.appendEntryRequest, handler);
+ } catch (TException e) {
+ handler.onError(e);
+ }
+ }
+ }
+
+ void sendLog(SendLogRequest logRequest) {
+ if (clusterConfig.isUseAsyncServer()) {
+ sendLogAsync(logRequest);
+ } else {
+ sendLogSync(logRequest);
+ }
+ }
+
class AppendEntriesHandler implements AsyncMethodCallback<AppendEntryResult> {
private final List<AsyncMethodCallback<AppendEntryResult>> singleEntryHandlers;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index dc2d9b0..e271ead 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -26,11 +26,13 @@ public class VotingLog {
protected Log log;
protected Set<Integer> stronglyAcceptedNodeIds;
protected Set<Integer> weaklyAcceptedNodeIds;
+ protected Set<Integer> failedNodeIds;
public VotingLog(Log log, int groupSize) {
this.log = log;
stronglyAcceptedNodeIds = new HashSet<>(groupSize);
weaklyAcceptedNodeIds = new HashSet<>(groupSize);
+ failedNodeIds = new HashSet<>(groupSize);
}
public Log getLog() {
@@ -61,4 +63,8 @@ public class VotingLog {
public String toString() {
return log.toString();
}
+
+ public Set<Integer> getFailedNodeIds() {
+ return failedNodeIds;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index 1c2390a..a749960 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -31,8 +31,6 @@ import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.LogPlan;
/**
* SynchronizedSequencer performs sequencing by taking the monitor of a LogManager within the caller
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
index c8efe39..7ad56bf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
@@ -58,9 +58,11 @@ public class StoppedMemberManager {
private Map<RaftNode, DataGroupMember> removedMemberMap = new HashMap<>();
private DataGroupMember.Factory memberFactory;
+ private Node thisNode;
- public StoppedMemberManager(Factory memberFactory) {
+ public StoppedMemberManager(Factory memberFactory, Node thisNode) {
this.memberFactory = memberFactory;
+ this.thisNode = thisNode;
recover();
}
@@ -147,7 +149,7 @@ public class StoppedMemberManager {
Node node = ClusterUtils.stringToNode(split[i]);
partitionGroup.add(node);
}
- DataGroupMember member = memberFactory.create(partitionGroup);
+ DataGroupMember member = memberFactory.create(thisNode, partitionGroup);
member.setReadOnly();
removedMemberMap.put(partitionGroup.getHeader(), member);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
deleted file mode 100644
index b03cfaf..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.server.handlers.caller;
-
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.server.member.RaftMember;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.iotdb.cluster.server.Response.RESPONSE_STRONG_ACCEPT;
-
-/**
- * AppendGroupEntryHandler checks if the log is successfully appended by the quorum or some node has
- * rejected it for some reason when one node has finished the AppendEntryRequest. The target of the
- * log is the data groups, the consistency can be reached as long as quorum data groups agree, even
- * if the actually agreed nodes can be less than quorum, because the same nodes may say "yes" for
- * multiple groups.
- */
-public class AppendGroupEntryHandler implements AsyncMethodCallback<AppendEntryResult> {
-
- private static final Logger logger = LoggerFactory.getLogger(AppendGroupEntryHandler.class);
-
- private RaftMember member;
- private Log log;
- // the number of nodes that accept the log in each group
- // to succeed, each number should reach zero
- // for example: assuming there are 4 nodes and 3 replicas, then the initial array will be:
- // [2, 2, 2, 2]. And if node0 accepted the log, as node0 is in group 2,3,0, the array will be
- // [1, 2, 1, 1].
- private Set<Integer>[] groupReceivedNodeIds;
- // the index of the node which the request sends log to, if the node accepts the log, all
- // groups' counters the node is in should decrease
- private int receiverNodeIndex;
- private Node receiverNode;
- // store the flag of leadership lost and the new leader's term
- private AtomicBoolean leaderShipStale;
- private AtomicLong newLeaderTerm;
- private int quorumSize;
- private int replicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
-
- private AtomicInteger erroredNodeNum = new AtomicInteger(0);
-
- public AppendGroupEntryHandler(
- Set<Integer>[] groupReceivedNodeIds,
- int receiverNodeIndex,
- Node receiverNode,
- AtomicBoolean leaderShipStale,
- Log log,
- AtomicLong newLeaderTerm,
- RaftMember member,
- int quorumSize) {
- this.groupReceivedNodeIds = groupReceivedNodeIds;
- this.receiverNodeIndex = receiverNodeIndex;
- this.receiverNode = receiverNode;
- this.leaderShipStale = leaderShipStale;
- this.log = log;
- this.newLeaderTerm = newLeaderTerm;
- this.member = member;
- this.quorumSize = quorumSize;
- }
-
- @Override
- public void onComplete(AppendEntryResult response) {
- if (leaderShipStale.get()) {
- // someone has rejected this log because the leadership is stale
- return;
- }
-
- long resp = response.status;
-
- if (resp == RESPONSE_STRONG_ACCEPT) {
- processAgreement();
- } else if (resp > 0) {
- // a response > 0 is the term fo the follower
- synchronized (groupReceivedNodeIds) {
- // the leader ship is stale, abort and wait for the new leader's heartbeat
- long previousNewTerm = newLeaderTerm.get();
- if (previousNewTerm < resp) {
- newLeaderTerm.set(resp);
- }
- leaderShipStale.set(true);
- groupReceivedNodeIds.notifyAll();
- }
- }
- // rejected because the follower's logs are stale or the follower has no cluster info, just
- // wait for the heartbeat to handle
- }
-
- /**
- * Decrease all related counters of the receiver node. See the field "groupReceivedCounter" for an
- * example. If all counters reach 0, wake the waiting thread to welcome the success.
- */
- private void processAgreement() {
- synchronized (groupReceivedNodeIds) {
- logger.debug("{}: Node {} has accepted log {}", member.getName(), receiverNode, log);
- // this node is contained in REPLICATION_NUM groups, decrease the counters of these groups
- for (int i = 0; i < replicationNum; i++) {
- int nodeIndex = receiverNodeIndex - i;
- if (nodeIndex < 0) {
- nodeIndex += groupReceivedNodeIds.length;
- }
- groupReceivedNodeIds[nodeIndex].add(receiverNode.nodeIdentifier);
- }
-
- // examine if all groups has agreed
- boolean allAgreed = true;
- for (Set<Integer> receivedNodeIds : groupReceivedNodeIds) {
- if (receivedNodeIds.size() < quorumSize) {
- allAgreed = false;
- break;
- }
- }
- if (allAgreed) {
- // wake up the parent thread to welcome the new node
- groupReceivedNodeIds.notifyAll();
- }
- }
- }
-
- @Override
- public void onError(Exception exception) {
- logger.error(
- "{}: Cannot send the add node request to node {}",
- member.getName(),
- receiverNode,
- exception);
- if (erroredNodeNum.incrementAndGet() >= replicationNum / 2) {
- synchronized (groupReceivedNodeIds) {
- logger.error(
- "{}: Over half of the nodes failed, the request is rejected", member.getName());
- groupReceivedNodeIds.notifyAll();
- }
- }
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 5c6be20..24b7b20 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -35,6 +35,9 @@ import java.net.ConnectException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_LOG_MISMATCH;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_OUT_OF_WINDOW;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_STRONG_ACCEPT;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_WEAK_ACCEPT;
@@ -55,9 +58,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
protected Node receiver;
protected Peer peer;
protected int quorumSize;
- // initialized as the quorum size, and decrease by 1 each time when we receive a rejection or
- // an exception, upon decreased to zero, the request will be early-aborted
- private int failedDecreasingCounter;
// nano start time when the send begins
private long sendStart = Long.MIN_VALUE;
@@ -87,7 +87,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
long resp = response.status;
- if (resp == RESPONSE_STRONG_ACCEPT) {
+ if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) {
synchronized (log) {
log.getStronglyAcceptedNodeIds().add(receiver.nodeIdentifier);
log.notifyAll();
@@ -124,8 +124,14 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
}
} else {
// e.g., Response.RESPONSE_LOG_MISMATCH
- logger.debug(
- "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
+ if (resp == RESPONSE_LOG_MISMATCH || resp == RESPONSE_OUT_OF_WINDOW) {
+ logger.debug(
+ "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
+ } else {
+ logger.warn(
+ "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
+ }
+
onFail();
}
// rejected because the receiver's logs are stale or the receiver has no cluster info, just
@@ -149,8 +155,8 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
private void onFail() {
synchronized (log) {
- failedDecreasingCounter--;
- if (failedDecreasingCounter <= 0) {
+ log.getFailedNodeIds().add(receiver.nodeIdentifier);
+ if (log.getFailedNodeIds().size() > quorumSize) {
// quorum members have failed, there is no need to wait for others
log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
log.notifyAll();
@@ -182,13 +188,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
this.receiverTerm = receiverTerm;
}
- public int getQuorumSize() {
- return quorumSize;
- }
-
public void setQuorumSize(int quorumSize) {
this.quorumSize = quorumSize;
- this.failedDecreasingCounter =
- ClusterDescriptor.getInstance().getConfig().getReplicationNum() - quorumSize;
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index b2108be..533adf8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -197,7 +197,7 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
: new BlockingLogAppender.Factory();
}
- DataGroupMember(TProtocolFactory factory, PartitionGroup nodes, MetaGroupMember metaGroupMember) {
+ DataGroupMember(Node thisNode, PartitionGroup nodes, MetaGroupMember metaGroupMember) {
// The name is used in JMX, so we have to avoid to use "(" "," "=" ")"
super(
"Data-"
@@ -211,6 +211,7 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
ClientManager.Type.DataGroupClient));
this.metaGroupMember = metaGroupMember;
+ setThisNode(thisNode);
setAllNodes(nodes);
mbeanName =
String.format(
@@ -335,8 +336,8 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
this.metaGroupMember = metaGroupMember;
}
- public DataGroupMember create(PartitionGroup partitionGroup) {
- return new DataGroupMember(protocolFactory, partitionGroup, metaGroupMember);
+ public DataGroupMember create(Node thisNode, PartitionGroup partitionGroup) {
+ return new DataGroupMember(thisNode, partitionGroup, metaGroupMember);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index d71f5e3..4d69153 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -236,7 +236,7 @@ public abstract class RaftMember implements RaftMemberMBean {
* client manager that provides reusable Thrift clients to connect to other RaftMembers and
* execute RPC requests. It will be initialized according to the implementation of the subclasses
*/
- private ClientManager clientManager;
+ protected ClientManager clientManager;
/**
* when the commit progress is updated by a heartbeat, this object is notified so that we may know
* if this node is up-to-date with the leader, and whether the given consistency is reached
@@ -359,6 +359,7 @@ public abstract class RaftMember implements RaftMemberMBean {
if (heartBeatService == null) {
return;
}
+ clientManager.s
heartBeatService.shutdownNow();
catchUpService.shutdownNow();
@@ -1196,7 +1197,6 @@ public abstract class RaftMember implements RaftMemberMBean {
long startTime;
switch (appendLogResult) {
case WEAK_ACCEPT:
- // TODO: change to weak
Statistic.RAFT_WEAK_ACCEPT.add(1);
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
log.getCreateTime());
@@ -1674,14 +1674,16 @@ public abstract class RaftMember implements RaftMemberMBean {
synchronized (log) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
- while (stronglyAcceptedNodeNum < quorumSize
+ while (
+ log.getLog().getCurrLogIndex() == -1 ||
+ stronglyAcceptedNodeNum < quorumSize
&& (!(ENABLE_WEAK_ACCEPTANCE && canBeWeaklyAccepted(log.getLog()))
|| (totalAccepted < quorumSize)
|| votingLogList.size() > config.getMaxNumOfLogsInMem())
&& alreadyWait < ClusterConstant.getWriteOperationTimeoutMS()
&& !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
try {
- log.wait(0);
+ log.wait(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Unexpected interruption when sending a log", e);
@@ -1762,7 +1764,7 @@ public abstract class RaftMember implements RaftMemberMBean {
while (!log.isApplied()) {
// wait until the log is applied
try {
- log.wait(0);
+ log.wait(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LogExecutionException(e);
@@ -2289,4 +2291,9 @@ public abstract class RaftMember implements RaftMemberMBean {
public String getRaftGroupFullId() {
return (getHeader() != null ? getHeader().node.nodeIdentifier : 0) + "#" + getRaftGroupId();
}
+
+ @TestOnly
+ public void setClientManager(ClientManager clientManager) {
+ this.clientManager = clientManager;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
index 74bda7e..a36701b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
@@ -76,7 +76,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
private DataGroupEngine() {
dataMemberFactory = new DataGroupMember.Factory(protocolFactory, metaGroupMember);
- stoppedMemberManager = new StoppedMemberManager(dataMemberFactory);
+ stoppedMemberManager = new StoppedMemberManager(dataMemberFactory, thisNode);
}
public static DataGroupEngine getInstance() {
@@ -96,7 +96,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
DataGroupMember.Factory dataMemberFactory, MetaGroupMember metaGroupMember) {
DataGroupEngine.metaGroupMember = metaGroupMember;
this.dataMemberFactory = dataMemberFactory;
- this.stoppedMemberManager = new StoppedMemberManager(dataMemberFactory);
+ this.stoppedMemberManager = new StoppedMemberManager(dataMemberFactory, thisNode);
}
@Override
@@ -242,7 +242,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
}
if (partitionGroup != null && partitionGroup.contains(thisNode)) {
// the two nodes are in the same group, create a new data member
- member = dataMemberFactory.create(partitionGroup);
+ member = dataMemberFactory.create(thisNode, partitionGroup);
headerGroupMap.put(header, member);
stoppedMemberManager.remove(header);
logger.info("Created a member for header {}, group is {}", header, partitionGroup);
@@ -310,7 +310,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
if (newGroup.contains(thisNode)) {
RaftNode header = newGroup.getHeader();
logger.info("Adding this node into a new group {}", newGroup);
- DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup);
+ DataGroupMember dataGroupMember = dataMemberFactory.create(thisNode, newGroup);
dataGroupMember = addDataGroupMember(dataGroupMember, header);
dataGroupMember.pullNodeAdditionSnapshots(
((SlotPartitionTable) partitionTable).getNodeSlots(header), node);
@@ -385,7 +385,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
if (prevMember == null || !prevMember.getAllNodes().equals(partitionGroup)) {
logger.info("Building member of data group: {}", partitionGroup);
// no previous member or member changed
- DataGroupMember dataGroupMember = dataMemberFactory.create(partitionGroup);
+ DataGroupMember dataGroupMember = dataMemberFactory.create(thisNode, partitionGroup);
// the previous member will be replaced here
addDataGroupMember(dataGroupMember, header);
dataGroupMember.setUnchanged(true);
@@ -455,7 +455,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
RaftNode header = group.getHeader();
if (!headerGroupMap.containsKey(header)) {
logger.info("{} should join a new group {}", thisNode, group);
- DataGroupMember dataGroupMember = dataMemberFactory.create(group);
+ DataGroupMember dataGroupMember = dataMemberFactory.create(thisNode, group);
addDataGroupMember(dataGroupMember, header);
}
// pull new slots from the removed node
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
index f1e313e..7c10e1a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
@@ -46,7 +46,7 @@ import java.util.NoSuchElementException;
public class ClientPoolFactoryTest {
private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
- private long mockMaxWaitTimeoutMs = 10 * 1000L;
+ private long mockMaxWaitTimeoutMs = 1000L;
private int mockMaxClientPerMember = 10;
private int maxClientPerNodePerMember = clusterConfig.getMaxClientPerNodePerMember();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
index 761f5e5..8320ea7 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.cluster.common;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -27,13 +29,20 @@ public class TestMetaGroupMember extends MetaGroupMember {
public TestMetaGroupMember() {
super();
- allNodes = new PartitionGroup();
- thisNode = TestUtils.getNode(0);
- for (int i = 0; i < 10; i++) {
- allNodes.add(TestUtils.getNode(i));
- }
+
MetaSingleSnapshotLogManager manager =
new MetaSingleSnapshotLogManager(new TestLogApplier(), this);
setLogManager(manager);
+
+ PartitionGroup group = new PartitionGroup();
+ thisNode = TestUtils.getNode(0);
+ for (int i = 0; i < 10; i++) {
+ group.add(TestUtils.getNode(i));
+ }
+ setAllNodes(group);
+
+ this.clientManager = new ClientManager(
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ ClientManager.Type.MetaGroupClient);
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
index 229bdcc..1ff27b5 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.cluster.log;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.client.ClientManager.Type;
import org.apache.iotdb.cluster.common.TestAsyncClient;
import org.apache.iotdb.cluster.common.TestMetaGroupMember;
import org.apache.iotdb.cluster.common.TestSyncClient;
@@ -107,6 +110,38 @@ public class LogDispatcherTest {
}
@Override
+ public AsyncClient getAsyncClient(Node node) {
+ return new TestAsyncClient() {
+
+ @Override
+ public void appendEntry(AppendEntryRequest request,
+ AsyncMethodCallback<AppendEntryResult> resultHandler) throws TException {
+ try {
+ if (!downNode.contains(node)) {
+ resultHandler.onComplete(mockedAppendEntry(request));
+ }
+ resultHandler.onComplete(new AppendEntryResult(-1));
+ } catch (UnknownLogTypeException e) {
+ throw new TException(e);
+ }
+ }
+
+ @Override
+ public void appendEntries(AppendEntriesRequest request,
+ AsyncMethodCallback<AppendEntryResult> resultHandler) throws TException {
+ try {
+ if (!downNode.contains(node)) {
+ resultHandler.onComplete(mockedAppendEntries(request));
+ }
+ resultHandler.onComplete(new AppendEntryResult(-1));
+ } catch (UnknownLogTypeException e) {
+ throw new TException(e);
+ }
+ }
+ };
+ }
+
+ @Override
public Client getSyncClient(Node node) {
return new TestSyncClient() {
@Override
@@ -172,6 +207,7 @@ public class LogDispatcherTest {
boolean useAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
LogDispatcher dispatcher = new LogDispatcher(raftMember);
+ raftMember.setClientManager(new ClientManager(true, Type.MetaGroupClient));
try {
List<Log> logs = TestUtils.prepareTestLogs(10);
for (Log log : logs) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java
deleted file mode 100644
index 14eac19..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.server.handlers.caller;
-
-import org.apache.iotdb.cluster.common.TestException;
-import org.apache.iotdb.cluster.common.TestLog;
-import org.apache.iotdb.cluster.common.TestMetaGroupMember;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
-import org.apache.iotdb.cluster.server.Response;
-import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class AppendGroupEntryHandlerTest {
-
- private int REPLICATION_NUM;
- private int prevReplicationNum;
- private RaftMember member;
-
- @Before
- public void setUp() {
- prevReplicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
- ClusterDescriptor.getInstance().getConfig().setReplicationNum(2);
- REPLICATION_NUM = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
- member = new TestMetaGroupMember();
- }
-
- @After
- public void tearDown() throws IOException, StorageEngineException {
- ClusterDescriptor.getInstance().getConfig().setReplicationNum(prevReplicationNum);
- member.stop();
- member.closeLogManager();
- EnvironmentUtils.cleanAllDir();
- }
-
- @Test
- public void testAgreement() throws InterruptedException {
- Set<Integer>[] groupReceivedCounter = new Set[10];
- for (int i = 0; i < 10; i++) {
- groupReceivedCounter[i] = new HashSet<>();
- }
- AtomicBoolean leadershipStale = new AtomicBoolean(false);
- AtomicLong newLeaderTerm = new AtomicLong(-1);
- Log testLog = new TestLog();
- synchronized (groupReceivedCounter) {
- for (int i = 0; i < 10; i += 2) {
- AppendGroupEntryHandler handler =
- new AppendGroupEntryHandler(
- groupReceivedCounter,
- i,
- TestUtils.getNode(i),
- leadershipStale,
- testLog,
- newLeaderTerm,
- member,
- REPLICATION_NUM / 2);
- new Thread(() -> handler.onComplete(new AppendEntryResult(Response.RESPONSE_AGREE)))
- .start();
- }
- groupReceivedCounter.wait();
- }
- for (int i = 0; i < 10; i++) {
- assertEquals(0, groupReceivedCounter[i]);
- }
- assertFalse(leadershipStale.get());
- assertEquals(-1, newLeaderTerm.get());
- }
-
- @Test
- public void testNoAgreement() throws InterruptedException {
- Set<Integer>[] groupReceivedCounter = new Set[10];
- for (int i = 0; i < 10; i++) {
- groupReceivedCounter[i] = new HashSet<>();
- }
- AtomicBoolean leadershipStale = new AtomicBoolean(false);
- AtomicLong newLeaderTerm = new AtomicLong(-1);
- Log testLog = new TestLog();
- synchronized (groupReceivedCounter) {
- for (int i = 0; i < 5; i++) {
- AppendGroupEntryHandler handler =
- new AppendGroupEntryHandler(
- groupReceivedCounter,
- i,
- TestUtils.getNode(i),
- leadershipStale,
- testLog,
- newLeaderTerm,
- member,
- REPLICATION_NUM / 2);
- handler.onComplete(new AppendEntryResult(Response.RESPONSE_AGREE));
- }
- }
- for (int i = 0; i < 10; i++) {
- if (i < 5) {
- assertEquals(Math.max(0, REPLICATION_NUM - (5 - i)), groupReceivedCounter[i]);
- } else {
- assertEquals(Math.min(10 - i, REPLICATION_NUM), groupReceivedCounter[i]);
- }
- }
- assertFalse(leadershipStale.get());
- assertEquals(-1, newLeaderTerm.get());
- }
-
- @Test
- public void testLeadershipStale() throws InterruptedException {
- Set<Integer>[] groupReceivedCounter = new Set[10];
- for (int i = 0; i < 10; i++) {
- groupReceivedCounter[i] = new HashSet<>();
- }
- AtomicBoolean leadershipStale = new AtomicBoolean(false);
- AtomicLong newLeaderTerm = new AtomicLong(-1);
- Log testLog = new TestLog();
- synchronized (groupReceivedCounter) {
- AppendGroupEntryHandler handler =
- new AppendGroupEntryHandler(
- groupReceivedCounter,
- 0,
- TestUtils.getNode(0),
- leadershipStale,
- testLog,
- newLeaderTerm,
- member,
- REPLICATION_NUM / 2);
- new Thread(() -> handler.onComplete(new AppendEntryResult(100L))).start();
- groupReceivedCounter.wait();
- }
- for (int i = 0; i < 10; i++) {
- assertEquals(REPLICATION_NUM / 2, groupReceivedCounter[i]);
- }
- assertTrue(leadershipStale.get());
- assertEquals(100, newLeaderTerm.get());
- }
-
- @Test
- public void testError() throws InterruptedException {
- Set<Integer>[] groupReceivedCounter = new Set[10];
- for (int i = 0; i < 10; i++) {
- groupReceivedCounter[i] = new HashSet<>();
- }
- AtomicBoolean leadershipStale = new AtomicBoolean(false);
- AtomicLong newLeaderTerm = new AtomicLong(-1);
- Log testLog = new TestLog();
-
- AppendGroupEntryHandler handler =
- new AppendGroupEntryHandler(
- groupReceivedCounter,
- 0,
- TestUtils.getNode(0),
- leadershipStale,
- testLog,
- newLeaderTerm,
- member,
- REPLICATION_NUM / 2);
- handler.onError(new TestException());
-
- for (int i = 0; i < 10; i++) {
- assertEquals(REPLICATION_NUM / 2, groupReceivedCounter[i]);
- }
- assertFalse(leadershipStale.get());
- assertEquals(-1, newLeaderTerm.get());
- }
-}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index 20f7700..b46eded 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -72,21 +72,24 @@ public class AppendNodeEntryHandlerTest {
ClusterDescriptor.getInstance().getConfig().setReplicationNum(10);
VotingLog votingLog = new VotingLog(log, 10);
Peer peer = new Peer(1);
- synchronized (votingLog) {
- for (int i = 0; i < 10; i++) {
- AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
- handler.setLeaderShipStale(leadershipStale);
- handler.setLog(votingLog);
- handler.setMember(member);
- handler.setReceiverTerm(receiverTerm);
- handler.setReceiver(TestUtils.getNode(i));
- handler.setPeer(peer);
- long resp = i >= 5 ? Response.RESPONSE_AGREE : Response.RESPONSE_LOG_MISMATCH;
- AppendEntryResult result = new AppendEntryResult();
- result.setStatus(resp);
- new Thread(() -> handler.onComplete(result)).start();
+ for (int i = 0; i < 10; i++) {
+ AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
+ handler.setLeaderShipStale(leadershipStale);
+ handler.setLog(votingLog);
+ handler.setMember(member);
+ handler.setReceiverTerm(receiverTerm);
+ handler.setReceiver(TestUtils.getNode(i));
+ handler.setPeer(peer);
+ handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
+ long resp = i >= 5 ? Response.RESPONSE_AGREE : Response.RESPONSE_LOG_MISMATCH;
+ AppendEntryResult result = new AppendEntryResult();
+ result.setStatus(resp);
+ new Thread(() -> handler.onComplete(result)).start();
+ }
+ while (votingLog.getStronglyAcceptedNodeIds().size() < 5) {
+ synchronized (votingLog) {
+ votingLog.wait(1);
}
- votingLog.wait();
}
assertEquals(-1, receiverTerm.get());
assertFalse(leadershipStale.get());
@@ -112,6 +115,7 @@ public class AppendNodeEntryHandlerTest {
handler.setReceiverTerm(receiverTerm);
handler.setReceiver(TestUtils.getNode(i));
handler.setPeer(peer);
+ handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
AppendEntryResult result = new AppendEntryResult();
result.setStatus(Response.RESPONSE_AGREE);
handler.onComplete(result);
@@ -138,6 +142,7 @@ public class AppendNodeEntryHandlerTest {
handler.setReceiverTerm(receiverTerm);
handler.setReceiver(TestUtils.getNode(0));
handler.setPeer(peer);
+ handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
new Thread(() -> handler.onComplete(new AppendEntryResult(100L))).start();
votingLog.wait();
}
@@ -164,11 +169,13 @@ public class AppendNodeEntryHandlerTest {
handler.setReceiverTerm(receiverTerm);
handler.setReceiver(TestUtils.getNode(0));
handler.setPeer(peer);
+ handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
handler.onError(new TestException());
assertEquals(-1, receiverTerm.get());
assertFalse(leadershipStale.get());
assertEquals(0, votingLog.getStronglyAcceptedNodeIds().size());
+
} finally {
ClusterDescriptor.getInstance().getConfig().setReplicationNum(replicationNum);
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index c3db432..b2f8e01 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -196,7 +196,7 @@ public class DataGroupMemberTest extends BaseMember {
private DataGroupMember getDataGroupMember(Node node, PartitionGroup nodes) {
DataGroupMember dataGroupMember =
- new DataGroupMember(new Factory(), nodes, testMetaMember) {
+ new DataGroupMember(node, nodes, testMetaMember) {
@Override
public boolean syncLeader(CheckConsistency checkConsistency) {
return true;
@@ -290,7 +290,6 @@ public class DataGroupMemberTest extends BaseMember {
}
};
PartitionedSnapshotLogManager logManager = getLogManager(nodes, dataGroupMember);
- dataGroupMember.setThisNode(node);
dataGroupMember.setLogManager(logManager);
dataGroupMember.setLeader(node);
dataGroupMember.setCharacter(NodeCharacter.LEADER);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 8ca9a7b..4e4f82f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -85,7 +85,6 @@ import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
import org.apache.iotdb.db.auth.entity.Role;
import org.apache.iotdb.db.auth.entity.User;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -189,7 +188,7 @@ public class MetaGroupMemberTest extends BaseMember {
new DataGroupEngine(
new DataGroupMember.Factory(new Factory(), testMetaMember) {
@Override
- public DataGroupMember create(PartitionGroup partitionGroup) {
+ public DataGroupMember create(Node thisNode, PartitionGroup partitionGroup) {
return getDataGroupMember(partitionGroup, TestUtils.getNode(0));
}
},
@@ -228,7 +227,7 @@ public class MetaGroupMemberTest extends BaseMember {
private DataGroupMember getDataGroupMember(PartitionGroup group, Node node) {
DataGroupMember dataGroupMember =
- new DataGroupMember(new Factory(), group, testMetaMember) {
+ new DataGroupMember(node, group, testMetaMember) {
@Override
public boolean syncLeader(CheckConsistency checkConsistency) {
return true;