You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/05/21 03:03:53 UTC
[incubator-iotdb] branch cluster updated: make nodetool get
accurate leader of data group
This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new ce845f2 make nodetool get accurate leader of data group
new 99ffb6b Merge remote-tracking branch 'origin/cluster' into cluster
ce845f2 is described below
commit ce845f201300b6bf36b91d22778db51c160327e4
Author: mdf369 <95...@qq.com>
AuthorDate: Tue May 21 11:03:22 2019 +0800
make nodetool get accurate leader of data group
---
.../org/apache/iotdb/cluster/entity/Server.java | 2 +
.../cluster/qp/executor/AbstractQPExecutor.java | 2 +-
.../cluster/qp/executor/NonQueryExecutor.java | 2 +-
.../apache/iotdb/cluster/qp/task/BatchQPTask.java | 2 +-
.../nonquery/DataGroupNonQueryAsyncProcessor.java | 2 +-
.../nonquery/MetaGroupNonQueryAsyncProcessor.java | 2 +-
.../querymetric/QueryLeaderAsyncProcessor.java | 45 ++++++++++++++++
.../request/querymetric/QueryLeaderRequest.java | 31 +++++++++++
.../response/querymetric/QueryLeaderResponse.java | 47 +++++++++++++++++
.../iotdb/cluster/utils/QPExecutorUtils.java | 2 +-
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 60 ++++++++++++++++++++--
.../apache/iotdb/cluster/utils/RaftUtilsTest.java | 6 +--
12 files changed, 189 insertions(+), 14 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index 1567bcd..70d2c26 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryJobNumAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryLeaderAsyncProcessor;
import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryMetricAsyncProcessor;
import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.DataGroupNonQueryAsyncProcessor;
import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.MetaGroupNonQueryAsyncProcessor;
@@ -167,6 +168,7 @@ public class Server {
rpcServer.registerUserProcessor(new QueryMetricAsyncProcessor());
rpcServer.registerUserProcessor(new QueryJobNumAsyncProcessor());
rpcServer.registerUserProcessor(new QueryStatusAsyncProcessor());
+ rpcServer.registerUserProcessor(new QueryLeaderAsyncProcessor());
}
public void stop() throws ProcessorException, InterruptedException {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index 1f0648c..9841781 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -136,7 +136,7 @@ public abstract class AbstractQPExecutor {
String groupId = task.getRequest().getGroupID();
RaftUtils.removeCachedRaftGroupLeader(groupId);
LOGGER.debug("Remove cached raft group leader of {}", groupId);
- leader = RaftUtils.getLeaderPeerID(groupId);
+ leader = RaftUtils.getLocalLeaderPeerID(groupId);
}
task.resetTask();
return asyncHandleNonQuerySingleTaskGetRes(task, leader, taskRetryNum + 1);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 1420370..75fc3a8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -326,7 +326,7 @@ public class NonQueryExecutor extends AbstractQPExecutor {
if (QPExecutorUtils.canHandleNonQueryByGroupId(groupId)) {
return handleNonQueryRequestLocally(groupId, qpTask);
} else {
- PeerId leader = RaftUtils.getLeaderPeerID(groupId);
+ PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
return asyncHandleNonQueryTask(qpTask, leader);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index 43edd67..667a55e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@ -126,7 +126,7 @@ public class BatchQPTask extends MultiQPTask {
taskThread = QPTaskManager.getInstance()
.submit(() -> executeLocalSubTask(subTask, groupId));
} else {
- PeerId leader = RaftUtils.getLeaderPeerID(groupId);
+ PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
taskThread = QPTaskManager.getInstance()
.submit(() -> executeRpcSubTask(subTask, leader, groupId));
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
index d3537fe..c331714 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
@@ -49,7 +49,7 @@ public class DataGroupNonQueryAsyncProcessor extends
String groupId = request.getGroupID();
DataPartitionRaftHolder dataPartitionRaftHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
if (!dataPartitionRaftHolder.getFsm().isLeader()) {
- PeerId leader = RaftUtils.getLeaderPeerID(groupId);
+ PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
LOGGER.debug("Request need to redirect leader: {}, groupId : {} ", leader, groupId);
DataGroupNonQueryResponse response = DataGroupNonQueryResponse
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/MetaGroupNonQueryAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/MetaGroupNonQueryAsyncProcessor.java
index 9f09bbb..95f9e32 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/MetaGroupNonQueryAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/MetaGroupNonQueryAsyncProcessor.java
@@ -49,7 +49,7 @@ public class MetaGroupNonQueryAsyncProcessor extends
String groupId = request.getGroupID();
MetadataRaftHolder metadataHolder = RaftUtils.getMetadataRaftHolder();
if (!metadataHolder.getFsm().isLeader()) {
- PeerId leader = RaftUtils.getLeaderPeerID(groupId);
+ PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
LOGGER.debug("Request need to redirect leader: {}, groupId : {} ", leader, groupId);
MetaGroupNonQueryResponse response = MetaGroupNonQueryResponse
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetric/QueryLeaderAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetric/QueryLeaderAsyncProcessor.java
new file mode 100644
index 0000000..9c5a2bf
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetric/QueryLeaderAsyncProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.processor.querymetric;
+
+import com.alipay.remoting.AsyncContext;
+import com.alipay.remoting.BizContext;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetric.QueryLeaderRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetric.QueryLeaderResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+
+public class QueryLeaderAsyncProcessor extends BasicAsyncUserProcessor<QueryLeaderRequest> {
+
+ @Override
+ public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
+ QueryLeaderRequest request) {
+ String groupId = request.getGroupID();
+
+ QueryLeaderResponse response = QueryLeaderResponse.createSuccessResponse(groupId,
+ RaftUtils.getLocalLeaderPeerID(groupId));
+ response.addResult(true);
+ asyncContext.sendResponse(response);
+ }
+
+ @Override
+ public String interest() {
+ return QueryLeaderRequest.class.getName();
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetric/QueryLeaderRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetric/QueryLeaderRequest.java
new file mode 100644
index 0000000..a3a2c06
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetric/QueryLeaderRequest.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.request.querymetric;
+
+import java.io.Serializable;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
+
+public class QueryLeaderRequest extends BasicRequest implements Serializable {
+
+ private static final long serialVersionUID = 8438291563829380829L;
+
+ public QueryLeaderRequest(String groupID) {
+ super(groupID);
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetric/QueryLeaderResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetric/QueryLeaderResponse.java
new file mode 100644
index 0000000..ad536aa
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetric/QueryLeaderResponse.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response.querymetric;
+
+import com.alipay.sofa.jraft.entity.PeerId;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
+
+public class QueryLeaderResponse extends BasicResponse {
+
+ private PeerId leader;
+
+ private QueryLeaderResponse(String groupId, boolean redirected, String leaderStr,
+ String errorMsg) {
+ super(groupId, redirected, leaderStr, errorMsg);
+ }
+
+ public static QueryLeaderResponse createSuccessResponse(String groupId, PeerId leader) {
+ QueryLeaderResponse response = new QueryLeaderResponse(groupId, false, null,
+ null);
+ response.leader = leader;
+ return response;
+ }
+
+ public static QueryLeaderResponse createErrorResponse(String groupId, String errorMsg) {
+ return new QueryLeaderResponse(groupId, false, null, errorMsg);
+ }
+
+ public PeerId getLeader() {
+ return leader;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
index 5a60351..811928c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
@@ -124,7 +124,7 @@ public class QPExecutorUtils {
public static boolean checkDataGroupLeader(String groupId) {
boolean isLeader = false;
if (router.containPhysicalNodeByGroupId(groupId, localNode) && RaftUtils
- .getPhysicalNodeFrom(RaftUtils.getLeaderPeerID(groupId)).equals(localNode)) {
+ .getPhysicalNodeFrom(RaftUtils.getLocalLeaderPeerID(groupId)).equals(localNode)) {
isLeader = true;
}
return isLeader;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 5f33652..9b59d6f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -61,10 +61,12 @@ import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
import org.apache.iotdb.cluster.rpc.raft.request.BasicNonQueryRequest;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querymetric.QueryJobNumRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetric.QueryLeaderRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querymetric.QueryMetricRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querymetric.QueryStatusRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
import org.apache.iotdb.cluster.rpc.raft.response.querymetric.QueryJobNumResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetric.QueryLeaderResponse;
import org.apache.iotdb.cluster.rpc.raft.response.querymetric.QueryMetricResponse;
import org.apache.iotdb.cluster.rpc.raft.response.nonquery.DataGroupNonQueryResponse;
import org.apache.iotdb.cluster.rpc.raft.response.nonquery.MetaGroupNonQueryResponse;
@@ -92,7 +94,7 @@ public class RaftUtils {
/**
* The cache will be update in two case: 1. When @onLeaderStart() method of state machine is
- * called, the cache will be update. 2. When @getLeaderPeerID() in this class is called and cache
+ * called, the cache will be update. 2. When @getLocalLeaderPeerID() in this class is called and cache
* don't have the key, it's will get random peer and update. 3. When @redirected of BasicRequest
* is true, the task will be retry and the cache will update.
*/
@@ -161,12 +163,44 @@ public class RaftUtils {
*
* @return leader id
*/
- public static PeerId getLeaderPeerID(String groupId) {
+ public static PeerId getLocalLeaderPeerID(String groupId) {
if (!groupLeaderCache.containsKey(groupId)) {
PeerId randomPeerId = getRandomPeerID(groupId);
groupLeaderCache.put(groupId, randomPeerId);
}
- return groupLeaderCache.get(groupId);
+ PeerId leader = groupLeaderCache.get(groupId);
+ LOGGER.debug("Get local cached leader {} of group {}.", leader, groupId);
+ return leader;
+ }
+
+ /**
+ * Get peer id to send request. If groupLeaderCache has the group id, then return leader id of the
+ * group.Otherwise, random get a peer of the group.
+ *
+ * @return leader id
+ */
+ public static PeerId getLeaderPeerIDFromRemoteNode(PeerId peerId, String groupId) {
+ QueryLeaderRequest request = new QueryLeaderRequest(groupId);
+ SingleQPTask task = new SingleQPTask(false, request);
+
+ LOGGER.debug("Execute get leader of group {} from node {}.", groupId, peerId);
+ try {
+ NodeAsClient client = RaftNodeAsClientManager.getInstance().getRaftNodeAsClient();
+ /** Call async method **/
+ client.asyncHandleRequest(task.getRequest(), peerId, task);
+
+ task.await();
+ PeerId leader = null;
+ if (task.getTaskState() == TaskState.FINISH) {
+ BasicResponse response = task.getResponse();
+ leader = response == null ? null : ((QueryLeaderResponse) response).getLeader();
+ }
+ LOGGER.debug("Get leader {} of group {} from node {}.", leader, groupId, peerId);
+ return leader;
+ } catch (RaftConnectionException | InterruptedException e) {
+ LOGGER.error("Fail to get leader of group {} from remote node {} because of {}.", groupId, peerId, e.getMessage());
+ return null;
+ }
}
/**
@@ -465,7 +499,23 @@ public class RaftUtils {
groupId = router.getGroupID(group);
nodes = getPeerIdArrayFrom(group);
}
- PeerId leader = RaftUtils.getLeaderPeerID(groupId);
+
+ PeerId leader = null;
+ for (PeerId node : nodes) {
+ LOGGER.debug("Try to get leader of group {} from node {}.", groupId, node);
+ leader = getLeaderPeerIDFromRemoteNode(node, groupId);
+ LOGGER.debug("Get leader {} of group {} from node {}.", leader, groupId, node);
+ if (leader != null) {
+ break;
+ }
+ }
+
+ if (leader == null) {
+ LOGGER.debug("Fail to get leader of group {} from all remote nodes, get it locally.", groupId);
+ leader = RaftUtils.getLocalLeaderPeerID(groupId);
+ LOGGER.debug("Get leader {} of group {} locally.", leader, groupId);
+ }
+
for (int i = 0; i < nodes.length; i++) {
if (leader.equals(nodes[i])) {
PeerId t = nodes[i];
@@ -608,7 +658,7 @@ public class RaftUtils {
SingleQPTask task = new SingleQPTask(false, request);
LOGGER.debug("Execute get metric for {} statement for group {}.", metric, groupId);
- PeerId holder = RaftUtils.getLeaderPeerID(groupId);
+ PeerId holder = RaftUtils.getLocalLeaderPeerID(groupId);
LOGGER.debug("Get metric from node {}.", holder);
try {
NodeAsClient client = RaftNodeAsClientManager.getInstance().getRaftNodeAsClient();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
index e7a4201..b6866ad 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
@@ -150,18 +150,18 @@ public class RaftUtilsTest {
RaftUtils.clearRaftGroupLeader();
PeerId metadtaLeader = PeerId.parsePeer(ipList[0]);
RaftUtils.updateRaftGroupLeader(ClusterConfig.METADATA_GROUP_ID, metadtaLeader);
- assertEquals(metadtaLeader, RaftUtils.getLeaderPeerID(ClusterConfig.METADATA_GROUP_ID));
+ assertEquals(metadtaLeader, RaftUtils.getLocalLeaderPeerID(ClusterConfig.METADATA_GROUP_ID));
boolean[] isLeaderCached = {true, false, true, false, true};
for (int i = 0; i < ipList.length; i++) {
if (isLeaderCached[i]) {
PeerId leaderExpeted = PeerId.parsePeer(ipList[(i + 1) % ipList.length]);
RaftUtils.updateRaftGroupLeader(Router.DATA_GROUP_STR + i, leaderExpeted);
- PeerId leaderActual = RaftUtils.getLeaderPeerID(Router.DATA_GROUP_STR + i);
+ PeerId leaderActual = RaftUtils.getLocalLeaderPeerID(Router.DATA_GROUP_STR + i);
assertTrue(leaderExpeted.equals(leaderActual));
} else {
- PeerId leader = RaftUtils.getLeaderPeerID(Router.DATA_GROUP_STR + i);
+ PeerId leader = RaftUtils.getLocalLeaderPeerID(Router.DATA_GROUP_STR + i);
boolean flag = false;
for (int j = 0; j < replicator; j++) {
String addr = ipList[(i + j) % ipList.length];