You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by do...@apache.org on 2019/04/11 10:10:28 UTC
[incubator-iotdb] branch cluster updated: add raft utils test
This is an automated email from the ASF dual-hosted git repository.
dope pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 7e5c645 add raft utils test
7e5c645 is described below
commit 7e5c645318fab558ec1064eb8bbd1dc8b6c84be1
Author: XuYi <xu...@126.com>
AuthorDate: Thu Apr 11 18:10:17 2019 +0800
add raft utils test
---
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 19 +-
.../apache/iotdb/cluster/utils/hash/Router.java | 2 +-
.../apache/iotdb/cluster/utils/RaftUtilsTest.java | 237 +++++++++++++++++++++
.../iotdb/cluster/utils/hash/RouterTest.java | 16 +-
4 files changed, 264 insertions(+), 10 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 7259090..b960b9d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -26,6 +26,8 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.util.Bits;
+import com.alipay.sofa.jraft.util.OnlyForTest;
+
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@@ -83,6 +85,10 @@ public class RaftUtils {
* Get random peer id
*/
public static PeerId getRandomPeerID(String groupId) {
+ return getRandomPeerID(groupId, server, router);
+ }
+
+ public static PeerId getRandomPeerID(String groupId, Server server, Router router) {
PeerId randomPeerId;
if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) {
RaftService service = (RaftService) server.getMetadataHolder().getService();
@@ -122,6 +128,7 @@ public class RaftUtils {
return peerIds;
}
+ @Deprecated
public static int getIndexOfIpFromRaftNodeList(String ip, PeerId[] peerIds) {
for (int i = 0; i < peerIds.length; i++) {
if (peerIds[i].getIp().equals(ip)) {
@@ -134,7 +141,7 @@ public class RaftUtils {
public static PhysicalNode[] getPhysicalNodeArrayFrom(PeerId[] peerIds) {
PhysicalNode[] physicalNodes = new PhysicalNode[peerIds.length];
for (int i = 0; i < peerIds.length; i++) {
- physicalNodes[i] = new PhysicalNode(peerIds[i].getIp(), peerIds[i].getPort());
+ physicalNodes[i] = getPhysicalNodeFrom(peerIds[i]);
}
return physicalNodes;
}
@@ -158,6 +165,11 @@ public class RaftUtils {
LOGGER.info("group leader cache:{}", groupLeaderCache);
}
+ @OnlyForTest
+ public static void clearRaftGroupLeader() {
+ groupLeaderCache.clear();
+ }
+
/**
* Execute raft task for local processor
*
@@ -255,6 +267,11 @@ public class RaftUtils {
*/
public static void handleNullReadToMetaGroup(Status status) {
SingleQPTask nullReadTask = new SingleQPTask(false, null);
+ handleNullReadToMetaGroup(status, server, nullReadTask);
+ }
+
+ public static void handleNullReadToMetaGroup(Status status, Server server,
+ SingleQPTask nullReadTask) {
try {
LOGGER.debug("Handle null-read in meta group for adding path request.");
final byte[] reqContext = RaftUtils.createRaftRequestContext();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
index 496dff6..7c7b2be 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
@@ -85,7 +85,7 @@ public class Router {
/**
* Change this method to public for test, you should not invoke this method explicitly.
*/
- void init() {
+ public void init() {
reset();
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
String[] hosts = config.getNodes();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
new file mode 100644
index 0000000..7205031
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.utils;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import com.alipay.remoting.AsyncContext;
+import com.alipay.sofa.jraft.Node;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.entity.Task;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.alipay.sofa.jraft.entity.PeerId;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
+import org.apache.iotdb.cluster.qp.callback.QPTask;
+import org.apache.iotdb.cluster.qp.callback.SingleQPTask;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
+import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
+import org.apache.iotdb.cluster.utils.hash.Router;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+public class RaftUtilsTest {
+
+ private ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ private String[] ipListOld;
+ private int replicatorOld;
+ private int numOfVirtualNodesOld;
+ private int PORT = 7777;
+ private String[] ipList = {
+ "192.168.130.4:" + PORT,
+ "192.168.130.5:" + PORT,
+ "192.168.130.2:" + PORT,
+ "192.168.130.1:" + PORT,
+ "192.168.130.3:" + PORT
+ };
+ private int replicator = 3;
+
+ @Mock
+ private Server server;
+ @Mock
+ private MetadataRaftHolder metadataHolder;
+ @Mock
+ private RaftService service;
+ @Mock
+ private Node node;
+ @Mock
+ private QPTask qpTask;
+ @Mock
+ private BasicResponse response;
+ @Mock
+ private BasicRequest request;
+
+ @Mock
+ AsyncContext asyncContext;
+ @Mock
+ private SingleQPTask nullReadTask;
+
+ private List<PeerId> peerIds;
+
+ @Before
+ public void setUp() throws Exception {
+ peerIds = new ArrayList<>();
+ for (String addr : ipList) {
+ peerIds.add(PeerId.parsePeer(addr));
+ }
+ MockitoAnnotations.initMocks(this);
+ when(server.getMetadataHolder()).thenReturn(metadataHolder);
+ when(metadataHolder.getService()).thenReturn(service);
+ when(service.getPeerIdList()).thenReturn(peerIds);
+ when(service.getNode()).thenReturn(node);
+ Mockito.doNothing().when(node).apply(any(Task.class));
+ Mockito.doNothing().when(response).addResult(any(boolean.class));
+ Mockito.doNothing().when(response).setErrorMsg(any(String.class));
+ ipListOld = config.getNodes();
+ replicatorOld = config.getReplication();
+ numOfVirtualNodesOld = config.getNumOfVirtualNodes();
+
+ int numOfVirtualNodes = 2;
+ config.setNodes(ipList);
+ config.setReplication(replicator);
+ config.setNumOfVirtualNodes(numOfVirtualNodes);
+ Router router = Router.getInstance();
+ router.init();
+ router.showPhysicalRing();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ peerIds.clear();
+ config.setNodes(ipListOld);
+ config.setReplication(replicatorOld);
+ config.setNumOfVirtualNodes(numOfVirtualNodesOld);
+ }
+
+ @Test
+ public void testGetLeaderPeerID() {
+ RaftUtils.clearRaftGroupLeader();
+ PeerId metadtaLeader = PeerId.parsePeer(ipList[0]);
+ RaftUtils.updateRaftGroupLeader(ClusterConfig.METADATA_GROUP_ID, metadtaLeader);
+ assertEquals(metadtaLeader, RaftUtils.getLeaderPeerID(ClusterConfig.METADATA_GROUP_ID));
+
+ boolean[] isLeaderCached = {true, false, true, false, true};
+ for (int i = 0; i < ipList.length; i++) {
+ if (isLeaderCached[i]) {
+ PeerId leaderExpeted = PeerId.parsePeer(ipList[(i + 1) % ipList.length]);
+ RaftUtils.updateRaftGroupLeader(Router.DATA_GROUP_STR + i, leaderExpeted);
+ PeerId leaderActual = RaftUtils.getLeaderPeerID(Router.DATA_GROUP_STR + i);
+ assertTrue(leaderExpeted.equals(leaderActual));
+
+ } else {
+ PeerId leader = RaftUtils.getLeaderPeerID(Router.DATA_GROUP_STR + i);
+ boolean flag = false;
+ for (int j = 0; j < replicator; j++) {
+ String addr = ipList[(i + j) % ipList.length];
+ if (leader.equals(PeerId.parsePeer(addr))) {
+ flag = true;
+ break;
+ }
+ }
+ assertTrue(flag);
+ }
+ }
+ RaftUtils.clearRaftGroupLeader();
+ }
+
+ @Test
+ public void testGetRandomPeerID() {
+ Router router = Router.getInstance();
+ for (int i = 0; i < 100; i++) {
+ PeerId id = RaftUtils.getRandomPeerID(ClusterConfig.METADATA_GROUP_ID, server, router);
+ assertTrue(peerIds.contains(id));
+ }
+
+ for (int i = 0; i < 100; i++) {
+ int groudID = i % ipList.length;
+ PeerId id = RaftUtils.getRandomPeerID(Router.DATA_GROUP_STR + groudID, server, router);
+ boolean flag = false;
+ for (int j = 0; j < replicator; j++) {
+ String addr = ipList[(groudID + j) % ipList.length];
+ if (id.equals(PeerId.parsePeer(addr))) {
+ flag = true;
+ break;
+ }
+ }
+ assertTrue(flag);
+ }
+ }
+
+ @Test
+ public void testGetPeerIDFrom() {
+ PhysicalNode node = new PhysicalNode("1.2.3.4", 1234);
+ PeerId id = new PeerId("1.2.3.4", 1234);
+ assertEquals(id, RaftUtils.getPeerIDFrom(node));
+ }
+
+ @Test
+ public void testGetPhysicalNodeFrom() {
+ PeerId id = new PeerId("1.2.3.4", 1234);
+ PhysicalNode node = new PhysicalNode("1.2.3.4", 1234);
+ assertEquals(node, RaftUtils.getPhysicalNodeFrom(id));
+ }
+
+ @Test
+ public void testConvertStringArrayToPeerIdArray() {
+ PeerId[] peerIds = new PeerId[ipList.length];
+ for (int i = 0; i < ipList.length; i++) {
+ peerIds[i] = PeerId.parsePeer(ipList[i]);
+ }
+ assertArrayEquals(peerIds, RaftUtils.convertStringArrayToPeerIdArray(ipList));
+ }
+
+ @Test
+ public void testGetPhysicalNodeAndPeerIdArrayFrom() {
+ PhysicalNode[] pNodes = new PhysicalNode[ipList.length];
+ PeerId[] peerIds = new PeerId[ipList.length];
+ for (int i = 0; i < ipList.length; i++) {
+ String[] values = ipList[i].split(":");
+ pNodes[i] = new PhysicalNode(values[0], Integer.parseInt(values[1]));
+ peerIds[i] = PeerId.parsePeer(ipList[i]);
+ }
+ assertArrayEquals(pNodes, RaftUtils.getPhysicalNodeArrayFrom(peerIds));
+ assertArrayEquals(peerIds, RaftUtils.getPeerIdArrayFrom(pNodes));
+ }
+
+ @Test
+ public void testExecuteRaftTaskForLocalProcessor() throws InterruptedException, IOException {
+ DataGroupNonQueryRequest request = new DataGroupNonQueryRequest("", new ArrayList<>());
+ Mockito.doNothing().when(qpTask).await();
+ when(qpTask.getRequest()).thenReturn(request);
+ when(qpTask.getResponse()).thenReturn(null);
+ assertFalse(RaftUtils.executeRaftTaskForLocalProcessor(service, qpTask, response));
+ }
+
+ @Test
+ public void testExecuteRaftTaskForRpcProcessor() throws IOException {
+ DataGroupNonQueryRequest request = new DataGroupNonQueryRequest("", new ArrayList<>());
+ RaftUtils.executeRaftTaskForRpcProcessor(service, asyncContext, request, response);
+ }
+
+ @Test
+ public void testHandleNullReadToMetaGroup() throws InterruptedException {
+ Mockito.doNothing().when(nullReadTask).await();
+ RaftUtils.handleNullReadToMetaGroup(new Status(), server, nullReadTask);
+ }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java
index 6a94abb..e88460c 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java
@@ -28,25 +28,25 @@ import org.junit.Test;
public class RouterTest {
- ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- String[] ipListOld;
- int replicatorOld;
- int numOfVirtulaNodesOld;
- HashFunction function = new MD5Hash();
- final int PORT = 7777;
+ private ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ private String[] ipListOld;
+ private int replicatorOld;
+ private int numOfVirtualNodesOld;
+// private HashFunction function = new MD5Hash();
+ private final int PORT = 7777;
@Before
public void setUp() throws Exception {
ipListOld = config.getNodes();
replicatorOld = config.getReplication();
- numOfVirtulaNodesOld = config.getNumOfVirtualNodes();
+ numOfVirtualNodesOld = config.getNumOfVirtualNodes();
}
@After
public void tearDown() throws Exception {
config.setNodes(ipListOld);
config.setReplication(replicatorOld);
- config.setNumOfVirtualNodes(numOfVirtulaNodesOld);
+ config.setNumOfVirtualNodes(numOfVirtualNodesOld);
}
@Test