You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by do...@apache.org on 2019/04/11 10:10:28 UTC

[incubator-iotdb] branch cluster updated: add raft utils test

This is an automated email from the ASF dual-hosted git repository.

dope pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 7e5c645  add raft utils test
7e5c645 is described below

commit 7e5c645318fab558ec1064eb8bbd1dc8b6c84be1
Author: XuYi <xu...@126.com>
AuthorDate: Thu Apr 11 18:10:17 2019 +0800

    add raft utils test
---
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  |  19 +-
 .../apache/iotdb/cluster/utils/hash/Router.java    |   2 +-
 .../apache/iotdb/cluster/utils/RaftUtilsTest.java  | 237 +++++++++++++++++++++
 .../iotdb/cluster/utils/hash/RouterTest.java       |  16 +-
 4 files changed, 264 insertions(+), 10 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 7259090..b960b9d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -26,6 +26,8 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.entity.Task;
 import com.alipay.sofa.jraft.util.Bits;
+import com.alipay.sofa.jraft.util.OnlyForTest;
+
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
@@ -83,6 +85,10 @@ public class RaftUtils {
    * Get random peer id
    */
   public static PeerId getRandomPeerID(String groupId) {
+    return getRandomPeerID(groupId, server, router);
+  }
+
+  public static PeerId getRandomPeerID(String groupId, Server server, Router router) {
     PeerId randomPeerId;
     if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) {
       RaftService service = (RaftService) server.getMetadataHolder().getService();
@@ -122,6 +128,7 @@ public class RaftUtils {
     return peerIds;
   }
 
+  @Deprecated
   public static int getIndexOfIpFromRaftNodeList(String ip, PeerId[] peerIds) {
     for (int i = 0; i < peerIds.length; i++) {
       if (peerIds[i].getIp().equals(ip)) {
@@ -134,7 +141,7 @@ public class RaftUtils {
   public static PhysicalNode[] getPhysicalNodeArrayFrom(PeerId[] peerIds) {
     PhysicalNode[] physicalNodes = new PhysicalNode[peerIds.length];
     for (int i = 0; i < peerIds.length; i++) {
-      physicalNodes[i] = new PhysicalNode(peerIds[i].getIp(), peerIds[i].getPort());
+      physicalNodes[i] = getPhysicalNodeFrom(peerIds[i]);
     }
     return physicalNodes;
   }
@@ -158,6 +165,11 @@ public class RaftUtils {
     LOGGER.info("group leader cache:{}", groupLeaderCache);
   }
 
+  @OnlyForTest
+  public static void clearRaftGroupLeader() {
+	  groupLeaderCache.clear();
+  }
+
   /**
    * Execute raft task for local processor
    *
@@ -255,6 +267,11 @@ public class RaftUtils {
    */
   public static void handleNullReadToMetaGroup(Status status) {
     SingleQPTask nullReadTask = new SingleQPTask(false, null);
+    handleNullReadToMetaGroup(status, server, nullReadTask);
+  }
+
+  public static void handleNullReadToMetaGroup(Status status, Server server,
+      SingleQPTask nullReadTask) {
     try {
       LOGGER.debug("Handle null-read in meta group for adding path request.");
       final byte[] reqContext = RaftUtils.createRaftRequestContext();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
index 496dff6..7c7b2be 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
@@ -85,7 +85,7 @@ public class Router {
   /**
    * Change this method to public for test, you should not invoke this method explicitly.
    */
-  void init() {
+  public void init() {
     reset();
     ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
     String[] hosts = config.getNodes();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
new file mode 100644
index 0000000..7205031
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.utils;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import com.alipay.remoting.AsyncContext;
+import com.alipay.sofa.jraft.Node;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.entity.Task;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.alipay.sofa.jraft.entity.PeerId;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
+import org.apache.iotdb.cluster.qp.callback.QPTask;
+import org.apache.iotdb.cluster.qp.callback.SingleQPTask;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
+import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
+import org.apache.iotdb.cluster.utils.hash.Router;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+public class RaftUtilsTest {
+
+  private ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+  private String[] ipListOld;
+  private int replicatorOld;
+  private int numOfVirtualNodesOld;
+  private int PORT = 7777;
+  private String[] ipList = {
+      "192.168.130.4:" + PORT,
+      "192.168.130.5:" + PORT,
+      "192.168.130.2:" + PORT,
+      "192.168.130.1:" + PORT,
+      "192.168.130.3:" + PORT
+  };
+  private int replicator = 3;
+
+  @Mock
+  private Server server;
+  @Mock
+  private MetadataRaftHolder metadataHolder;
+  @Mock
+  private RaftService service;
+  @Mock
+  private Node node;
+  @Mock
+  private QPTask qpTask;
+  @Mock
+  private BasicResponse response;
+  @Mock
+  private BasicRequest request;
+
+  @Mock
+  AsyncContext asyncContext;
+  @Mock
+  private SingleQPTask nullReadTask;
+
+  private List<PeerId> peerIds;
+
+  @Before
+  public void setUp() throws Exception {
+    peerIds = new ArrayList<>();
+    for (String addr : ipList) {
+      peerIds.add(PeerId.parsePeer(addr));
+    }
+    MockitoAnnotations.initMocks(this);
+    when(server.getMetadataHolder()).thenReturn(metadataHolder);
+    when(metadataHolder.getService()).thenReturn(service);
+    when(service.getPeerIdList()).thenReturn(peerIds);
+    when(service.getNode()).thenReturn(node);
+    Mockito.doNothing().when(node).apply(any(Task.class));
+    Mockito.doNothing().when(response).addResult(any(boolean.class));
+    Mockito.doNothing().when(response).setErrorMsg(any(String.class));
+    ipListOld = config.getNodes();
+    replicatorOld = config.getReplication();
+    numOfVirtualNodesOld = config.getNumOfVirtualNodes();
+
+    int numOfVirtualNodes = 2;
+    config.setNodes(ipList);
+    config.setReplication(replicator);
+    config.setNumOfVirtualNodes(numOfVirtualNodes);
+    Router router = Router.getInstance();
+    router.init();
+    router.showPhysicalRing();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    peerIds.clear();
+    config.setNodes(ipListOld);
+    config.setReplication(replicatorOld);
+    config.setNumOfVirtualNodes(numOfVirtualNodesOld);
+  }
+
+  @Test
+  public void testGetLeaderPeerID() {
+    RaftUtils.clearRaftGroupLeader();
+    PeerId metadtaLeader = PeerId.parsePeer(ipList[0]);
+    RaftUtils.updateRaftGroupLeader(ClusterConfig.METADATA_GROUP_ID, metadtaLeader);
+    assertEquals(metadtaLeader, RaftUtils.getLeaderPeerID(ClusterConfig.METADATA_GROUP_ID));
+
+    boolean[] isLeaderCached = {true, false, true, false, true};
+    for (int i = 0; i < ipList.length; i++) {
+      if (isLeaderCached[i]) {
+        PeerId leaderExpeted = PeerId.parsePeer(ipList[(i + 1) % ipList.length]);
+        RaftUtils.updateRaftGroupLeader(Router.DATA_GROUP_STR + i, leaderExpeted);
+        PeerId leaderActual = RaftUtils.getLeaderPeerID(Router.DATA_GROUP_STR + i);
+        assertTrue(leaderExpeted.equals(leaderActual));
+
+      } else {
+        PeerId leader = RaftUtils.getLeaderPeerID(Router.DATA_GROUP_STR + i);
+        boolean flag = false;
+        for (int j = 0; j < replicator; j++) {
+          String addr = ipList[(i + j) % ipList.length];
+          if (leader.equals(PeerId.parsePeer(addr))) {
+            flag = true;
+            break;
+          }
+        }
+        assertTrue(flag);
+      }
+    }
+    RaftUtils.clearRaftGroupLeader();
+  }
+
+  @Test
+  public void testGetRandomPeerID() {
+    Router router = Router.getInstance();
+    for (int i = 0; i < 100; i++) {
+      PeerId id = RaftUtils.getRandomPeerID(ClusterConfig.METADATA_GROUP_ID, server, router);
+      assertTrue(peerIds.contains(id));
+    }
+
+    for (int i = 0; i < 100; i++) {
+      int groudID = i % ipList.length;
+      PeerId id = RaftUtils.getRandomPeerID(Router.DATA_GROUP_STR + groudID, server, router);
+      boolean flag = false;
+      for (int j = 0; j < replicator; j++) {
+        String addr = ipList[(groudID + j) % ipList.length];
+        if (id.equals(PeerId.parsePeer(addr))) {
+          flag = true;
+          break;
+        }
+      }
+      assertTrue(flag);
+    }
+  }
+
+  @Test
+  public void testGetPeerIDFrom() {
+    PhysicalNode node = new PhysicalNode("1.2.3.4", 1234);
+    PeerId id = new PeerId("1.2.3.4", 1234);
+    assertEquals(id, RaftUtils.getPeerIDFrom(node));
+  }
+
+  @Test
+  public void testGetPhysicalNodeFrom() {
+    PeerId id = new PeerId("1.2.3.4", 1234);
+    PhysicalNode node = new PhysicalNode("1.2.3.4", 1234);
+    assertEquals(node, RaftUtils.getPhysicalNodeFrom(id));
+  }
+
+  @Test
+  public void testConvertStringArrayToPeerIdArray() {
+    PeerId[] peerIds = new PeerId[ipList.length];
+    for (int i = 0; i < ipList.length; i++) {
+      peerIds[i] = PeerId.parsePeer(ipList[i]);
+    }
+    assertArrayEquals(peerIds, RaftUtils.convertStringArrayToPeerIdArray(ipList));
+  }
+
+  @Test
+  public void testGetPhysicalNodeAndPeerIdArrayFrom() {
+    PhysicalNode[] pNodes = new PhysicalNode[ipList.length];
+    PeerId[] peerIds = new PeerId[ipList.length];
+    for (int i = 0; i < ipList.length; i++) {
+      String[] values = ipList[i].split(":");
+      pNodes[i] = new PhysicalNode(values[0], Integer.parseInt(values[1]));
+      peerIds[i] = PeerId.parsePeer(ipList[i]);
+    }
+    assertArrayEquals(pNodes, RaftUtils.getPhysicalNodeArrayFrom(peerIds));
+    assertArrayEquals(peerIds, RaftUtils.getPeerIdArrayFrom(pNodes));
+  }
+
+  @Test
+  public void testExecuteRaftTaskForLocalProcessor() throws InterruptedException, IOException {
+    DataGroupNonQueryRequest request = new DataGroupNonQueryRequest("", new ArrayList<>());
+    Mockito.doNothing().when(qpTask).await();
+    when(qpTask.getRequest()).thenReturn(request);
+    when(qpTask.getResponse()).thenReturn(null);
+    assertFalse(RaftUtils.executeRaftTaskForLocalProcessor(service, qpTask, response));
+  }
+
+  @Test
+  public void testExecuteRaftTaskForRpcProcessor() throws IOException {
+    DataGroupNonQueryRequest request = new DataGroupNonQueryRequest("", new ArrayList<>());
+    RaftUtils.executeRaftTaskForRpcProcessor(service, asyncContext, request, response);
+  }
+
+  @Test
+  public void testHandleNullReadToMetaGroup() throws InterruptedException {
+    Mockito.doNothing().when(nullReadTask).await();
+    RaftUtils.handleNullReadToMetaGroup(new Status(), server, nullReadTask);
+  }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java
index 6a94abb..e88460c 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java
@@ -28,25 +28,25 @@ import org.junit.Test;
 
 public class RouterTest {
 
-  ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
-  String[] ipListOld;
-  int replicatorOld;
-  int numOfVirtulaNodesOld;
-  HashFunction function = new MD5Hash();
-  final int PORT = 7777;
+  private ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+  private String[] ipListOld;
+  private int replicatorOld;
+  private int numOfVirtualNodesOld;
+//  private HashFunction function = new MD5Hash();
+  private final int PORT = 7777;
 
   @Before
   public void setUp() throws Exception {
     ipListOld = config.getNodes();
     replicatorOld = config.getReplication();
-    numOfVirtulaNodesOld = config.getNumOfVirtualNodes();
+    numOfVirtualNodesOld = config.getNumOfVirtualNodes();
   }
 
   @After
   public void tearDown() throws Exception {
     config.setNodes(ipListOld);
     config.setReplication(replicatorOld);
-    config.setNumOfVirtualNodes(numOfVirtulaNodesOld);
+    config.setNumOfVirtualNodes(numOfVirtualNodesOld);
   }
 
   @Test