You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/06/07 02:22:28 UTC

[GitHub] [iotdb] neuyilan opened a new pull request #3364: Fix the session can not revome the broken cached leader connection.

neuyilan opened a new pull request #3364:
URL: https://github.com/apache/iotdb/pull/3364


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] neuyilan commented on a change in pull request #3364: Remove the broken cached leader connection & optimize the insertRecords method in session

Posted by GitBox <gi...@apache.org>.
neuyilan commented on a change in pull request #3364:
URL: https://github.com/apache/iotdb/pull/3364#discussion_r647074712



##########
File path: session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
##########
@@ -738,6 +1073,8 @@ private void addLine(
 
   static class MockSession extends Session {
 
+    private MockSessionConnection mockSessionConnection;

Review comment:
       have changed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] neuyilan commented on a change in pull request #3364: Fix the session can not remove the broken cached leader connection.

Posted by GitBox <gi...@apache.org>.
neuyilan commented on a change in pull request #3364:
URL: https://github.com/apache/iotdb/pull/3364#discussion_r646381348



##########
File path: session/src/main/java/org/apache/iotdb/session/Session.java
##########
@@ -1257,13 +1283,15 @@ private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets, boolean s
     EndPoint endPoint;
     SessionConnection connection;
     Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
+    Map<SessionConnection, EndPoint> sessionConnectionEndPointMap = new HashMap<>();
     for (Entry<String, Tablet> entry : tablets.entrySet()) {
       endPoint = deviceIdToEndpoint.get(entry.getKey());
       if (endPoint != null) {
         connection = endPointToSessionConnection.get(endPoint);
       } else {
         connection = defaultSessionConnection;
       }
+      sessionConnectionEndPointMap.putIfAbsent(connection, endPoint);

Review comment:
       That's another solution. the original though is as the number of connections is raly small, so I think it ok add one map here and trade space for time.
   As you say, maybe `IoTDBConnectionException` is rarely thrown, so I agree with you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] mychaow merged pull request #3364: Remove the broken cached leader connection & optimize the insertRecords method in session

Posted by GitBox <gi...@apache.org>.
mychaow merged pull request #3364:
URL: https://github.com/apache/iotdb/pull/3364


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] neuyilan commented on a change in pull request #3364: Fix the session can not remove the broken cached leader connection.

Posted by GitBox <gi...@apache.org>.
neuyilan commented on a change in pull request #3364:
URL: https://github.com/apache/iotdb/pull/3364#discussion_r646382202



##########
File path: session/src/main/java/org/apache/iotdb/session/Session.java
##########
@@ -1096,6 +1118,10 @@ private void insertRecordsWithLeaderCache(
         handleRedirection(entry.getKey(), e.getEndPoint());
       } catch (StatementExecutionException e) {
         errMsgBuilder.append(e.getMessage());
+      } catch (IoTDBConnectionException e) {

Review comment:
       Ok I'll optimize later in this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] neuyilan commented on pull request #3364: Fix the session can not remove the broken cached leader connection.

Posted by GitBox <gi...@apache.org>.
neuyilan commented on pull request #3364:
URL: https://github.com/apache/iotdb/pull/3364#issuecomment-855726909


   > How about adding some UTs for `removeBrokenSessionConnection` in `SessionCacheLeaderUT`.
   
   Sure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] neuyilan commented on a change in pull request #3364: Remove the broken cached leader connection & optimize the insertRecords method in session

Posted by GitBox <gi...@apache.org>.
neuyilan commented on a change in pull request #3364:
URL: https://github.com/apache/iotdb/pull/3364#discussion_r647074582



##########
File path: session/src/main/java/org/apache/iotdb/session/Session.java
##########
@@ -685,6 +685,27 @@ private SessionConnection getSessionConnection(String deviceId) {
     }
   }
 
+  // TODO https://issues.apache.org/jira/browse/IOTDB-1399
+  private void removeBrokenSessionConnection(SessionConnection sessionConnection) {
+    // remove the cached broken leader session
+    if (enableCacheLeader) {
+      EndPoint endPoint = null;
+      for (Map.Entry<EndPoint, SessionConnection> entry : endPointToSessionConnection.entrySet()) {
+        if (entry.getValue().equals(sessionConnection)) {
+          endPoint = entry.getKey();
+          endPointToSessionConnection.remove(endPoint);
+          break;
+        }
+      }
+
+      for (Map.Entry<String, EndPoint> entry : deviceIdToEndpoint.entrySet()) {
+        if (entry.getValue().equals(endPoint)) {
+          deviceIdToEndpoint.remove(entry.getKey());

Review comment:
       Thanks, done

##########
File path: service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
##########
@@ -95,6 +95,22 @@ public static void verifySuccessWithRedirection(TSStatus status)
     }
   }
 
+  public static void verifySuccessWithRedirectionForMultiDevices(
+      TSStatus status, List<String> devices) throws StatementExecutionException, RedirectException {
+    verifySuccess(status);
+    if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+      Map<String, EndPoint> deviceEndPointMap = new HashMap<>();
+      List<TSStatus> statusSubStatus = status.getSubStatus();
+      for (int i = 0; i < statusSubStatus.size(); i++) {
+        TSStatus subStatus = statusSubStatus.get(i);
+        if (subStatus.isSetRedirectNode()) {
+          deviceEndPointMap.put(devices.get(i), subStatus.getRedirectNode());
+        }
+      }
+      throw new RedirectException(deviceEndPointMap);
+    }
+  }
+
   public static void verifySuccessWithRedirectionForInsertTablets(

Review comment:
       sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #3364: Remove the broken cached leader connection & optimize the insertRecords method in session

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #3364:
URL: https://github.com/apache/iotdb/pull/3364#discussion_r647045641



##########
File path: session/src/main/java/org/apache/iotdb/session/Session.java
##########
@@ -685,6 +685,27 @@ private SessionConnection getSessionConnection(String deviceId) {
     }
   }
 
+  // TODO https://issues.apache.org/jira/browse/IOTDB-1399
+  private void removeBrokenSessionConnection(SessionConnection sessionConnection) {
+    // remove the cached broken leader session
+    if (enableCacheLeader) {
+      EndPoint endPoint = null;
+      for (Map.Entry<EndPoint, SessionConnection> entry : endPointToSessionConnection.entrySet()) {
+        if (entry.getValue().equals(sessionConnection)) {
+          endPoint = entry.getKey();
+          endPointToSessionConnection.remove(endPoint);
+          break;
+        }
+      }
+
+      for (Map.Entry<String, EndPoint> entry : deviceIdToEndpoint.entrySet()) {
+        if (entry.getValue().equals(endPoint)) {
+          deviceIdToEndpoint.remove(entry.getKey());

Review comment:
       Can we delete elements in list during traversal? Maybe we should replace it with Iterator.

##########
File path: session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
##########
@@ -738,6 +1073,8 @@ private void addLine(
 
   static class MockSession extends Session {
 
+    private MockSessionConnection mockSessionConnection;

Review comment:
       The name of this `mockSessionConnection` is misleading, I didn't understand the meaning until after debugging. I think we can change this name to a better one, such as `lastConstructedSessionConnection`

##########
File path: service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
##########
@@ -95,6 +95,22 @@ public static void verifySuccessWithRedirection(TSStatus status)
     }
   }
 
+  public static void verifySuccessWithRedirectionForMultiDevices(
+      TSStatus status, List<String> devices) throws StatementExecutionException, RedirectException {
+    verifySuccess(status);
+    if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+      Map<String, EndPoint> deviceEndPointMap = new HashMap<>();
+      List<TSStatus> statusSubStatus = status.getSubStatus();
+      for (int i = 0; i < statusSubStatus.size(); i++) {
+        TSStatus subStatus = statusSubStatus.get(i);
+        if (subStatus.isSetRedirectNode()) {
+          deviceEndPointMap.put(devices.get(i), subStatus.getRedirectNode());
+        }
+      }
+      throw new RedirectException(deviceEndPointMap);
+    }
+  }
+
   public static void verifySuccessWithRedirectionForInsertTablets(

Review comment:
       As we only use req.getDeviceIds() here, What about merge this function with `verifySuccessWithRedirectionForMultiDevices` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #3364: Fix the session can not remove the broken cached leader connection.

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #3364:
URL: https://github.com/apache/iotdb/pull/3364#discussion_r646309591



##########
File path: session/src/main/java/org/apache/iotdb/session/Session.java
##########
@@ -1096,6 +1118,10 @@ private void insertRecordsWithLeaderCache(
         handleRedirection(entry.getKey(), e.getEndPoint());
       } catch (StatementExecutionException e) {
         errMsgBuilder.append(e.getMessage());
+      } catch (IoTDBConnectionException e) {

Review comment:
       Maybe we can optimize insertRecords interface to perfrom one request per sessionConnection rather than per deviceID, just like insertTablets.

##########
File path: session/src/main/java/org/apache/iotdb/session/Session.java
##########
@@ -1257,13 +1283,15 @@ private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets, boolean s
     EndPoint endPoint;
     SessionConnection connection;
     Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
+    Map<SessionConnection, EndPoint> sessionConnectionEndPointMap = new HashMap<>();
     for (Entry<String, Tablet> entry : tablets.entrySet()) {
       endPoint = deviceIdToEndpoint.get(entry.getKey());
       if (endPoint != null) {
         connection = endPointToSessionConnection.get(endPoint);
       } else {
         connection = defaultSessionConnection;
       }
+      sessionConnectionEndPointMap.putIfAbsent(connection, endPoint);

Review comment:
       As `IoTDBConnectionException` is rarely thrown, I doubt whether this is worth recording even without errors, maybe we can traverse `endPointToSessionConnection` to delete expired sessionConnection when `IoTDBConnectionException` is thrown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #3364: Remove the broken cached leader connection & optimize the insertRecords method in session

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #3364:
URL: https://github.com/apache/iotdb/pull/3364#discussion_r647113127



##########
File path: session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
##########
@@ -709,6 +712,338 @@ public void testInsertTablets() throws IoTDBConnectionException, StatementExecut
     session.close();
   }
 
+  @Test
+  public void testInsertRecordsWithSessionBroken() throws StatementExecutionException {
+    // without leader cache
+    session = new MockSession("127.0.0.1", 55560, false);
+    try {
+      session.open();
+    } catch (IoTDBConnectionException e) {
+      fail(e.getMessage());
+    }
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertNull(session.deviceIdToEndpoint);
+    assertNull(session.endPointToSessionConnection);
+    ((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
+
+    List<String> allDeviceIds =
+        new ArrayList<String>() {
+          {
+            add("root.sg1.d1");
+            add("root.sg2.d1");
+            add("root.sg3.d1");
+            add("root.sg4.d1");
+          }
+        };
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    measurements.add("s3");
+
+    List<String> deviceIds = new ArrayList<>();
+    List<List<String>> measurementsList = new ArrayList<>();
+    List<List<Object>> valuesList = new ArrayList<>();
+    List<Long> timestamps = new ArrayList<>();
+    List<List<TSDataType>> typesList = new ArrayList<>();
+
+    for (long time = 0; time < 500; time++) {
+      List<Object> values = new ArrayList<>();
+      List<TSDataType> types = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      deviceIds.add(allDeviceIds.get((int) (time % allDeviceIds.size())));
+      measurementsList.add(measurements);
+      valuesList.add(values);
+      typesList.add(types);
+      timestamps.add(time);
+
+      if (time != 0 && time % 100 == 0) {
+        try {
+          session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+        } catch (IoTDBConnectionException e) {
+          Assert.assertEquals(
+              "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken",
+              e.getMessage());
+        }
+        deviceIds.clear();
+        measurementsList.clear();
+        valuesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    try {
+      session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+    } catch (IoTDBConnectionException e) {
+      Assert.assertEquals(
+          "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken", e.getMessage());
+    }
+    deviceIds.clear();
+    measurementsList.clear();
+    valuesList.clear();
+    timestamps.clear();
+
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertNull(session.deviceIdToEndpoint);
+    assertNull(session.endPointToSessionConnection);
+    try {
+      session.close();
+    } catch (IoTDBConnectionException e) {
+      Assert.assertEquals(
+          "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken", e.getMessage());
+    }
+
+    // with leader cache
+    // reset connection
+    session = new MockSession("127.0.0.1", 55560, true);
+    try {
+      session.open();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertEquals(0, session.deviceIdToEndpoint.size());
+    assertEquals(1, session.endPointToSessionConnection.size());
+    for (long time = 0; time < 500; time++) {
+      List<Object> values = new ArrayList<>();
+      List<TSDataType> types = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      deviceIds.add(allDeviceIds.get((int) (time % allDeviceIds.size())));
+      measurementsList.add(measurements);
+      valuesList.add(values);
+      typesList.add(types);
+      timestamps.add(time);
+      if (time != 0 && time % 100 == 0) {
+        try {
+          session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+        } catch (IoTDBConnectionException e) {
+          Assert.fail(e.getMessage());
+        }
+        deviceIds.clear();
+        measurementsList.clear();
+        valuesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    // set connection as broken, due to we enable the cache leader, when we called
+    // ((MockSession) session).getMockSessionConnection(), the session's endpoint have changed to

Review comment:
       ```suggestion
   // ((MockSession) session). getLastConstructedSessionConnection(), the session's endpoint has been changed to
   ```

##########
File path: session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
##########
@@ -709,6 +712,338 @@ public void testInsertTablets() throws IoTDBConnectionException, StatementExecut
     session.close();
   }
 
+  @Test
+  public void testInsertRecordsWithSessionBroken() throws StatementExecutionException {
+    // without leader cache
+    session = new MockSession("127.0.0.1", 55560, false);
+    try {
+      session.open();
+    } catch (IoTDBConnectionException e) {
+      fail(e.getMessage());
+    }
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertNull(session.deviceIdToEndpoint);
+    assertNull(session.endPointToSessionConnection);
+    ((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
+
+    List<String> allDeviceIds =
+        new ArrayList<String>() {
+          {
+            add("root.sg1.d1");
+            add("root.sg2.d1");
+            add("root.sg3.d1");
+            add("root.sg4.d1");
+          }
+        };
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    measurements.add("s3");
+
+    List<String> deviceIds = new ArrayList<>();
+    List<List<String>> measurementsList = new ArrayList<>();
+    List<List<Object>> valuesList = new ArrayList<>();
+    List<Long> timestamps = new ArrayList<>();
+    List<List<TSDataType>> typesList = new ArrayList<>();
+
+    for (long time = 0; time < 500; time++) {
+      List<Object> values = new ArrayList<>();
+      List<TSDataType> types = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      deviceIds.add(allDeviceIds.get((int) (time % allDeviceIds.size())));
+      measurementsList.add(measurements);
+      valuesList.add(values);
+      typesList.add(types);
+      timestamps.add(time);
+
+      if (time != 0 && time % 100 == 0) {
+        try {
+          session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+        } catch (IoTDBConnectionException e) {
+          Assert.assertEquals(
+              "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken",
+              e.getMessage());
+        }
+        deviceIds.clear();
+        measurementsList.clear();
+        valuesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    try {
+      session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+    } catch (IoTDBConnectionException e) {
+      Assert.assertEquals(
+          "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken", e.getMessage());
+    }
+    deviceIds.clear();
+    measurementsList.clear();
+    valuesList.clear();
+    timestamps.clear();
+
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertNull(session.deviceIdToEndpoint);
+    assertNull(session.endPointToSessionConnection);
+    try {
+      session.close();
+    } catch (IoTDBConnectionException e) {
+      Assert.assertEquals(
+          "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken", e.getMessage());
+    }
+
+    // with leader cache
+    // reset connection
+    session = new MockSession("127.0.0.1", 55560, true);
+    try {
+      session.open();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertEquals(0, session.deviceIdToEndpoint.size());
+    assertEquals(1, session.endPointToSessionConnection.size());
+    for (long time = 0; time < 500; time++) {
+      List<Object> values = new ArrayList<>();
+      List<TSDataType> types = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      deviceIds.add(allDeviceIds.get((int) (time % allDeviceIds.size())));
+      measurementsList.add(measurements);
+      valuesList.add(values);
+      typesList.add(types);
+      timestamps.add(time);
+      if (time != 0 && time % 100 == 0) {
+        try {
+          session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+        } catch (IoTDBConnectionException e) {
+          Assert.fail(e.getMessage());
+        }
+        deviceIds.clear();
+        measurementsList.clear();
+        valuesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    // set connection as broken, due to we enable the cache leader, when we called
+    // ((MockSession) session).getMockSessionConnection(), the session's endpoint have changed to
+    // EndPoint(ip:127.0.0.1, port:55562)
+    Assert.assertEquals(
+        "MockSessionConnection{ endPoint=EndPoint(ip:127.0.0.1, port:55562)}",
+        ((MockSession) session).getLastConstructedSessionConnection().toString());
+    ((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
+    try {
+      session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+    } catch (IoTDBConnectionException e) {
+      Assert.assertEquals(
+          "the session connection = EndPoint(ip:127.0.0.1, port:55562) is broken", e.getMessage());
+    }
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertEquals(3, session.deviceIdToEndpoint.size());
+    for (Map.Entry<String, EndPoint> endPointMap : session.deviceIdToEndpoint.entrySet()) {
+      assertEquals(getDeviceIdBelongedEndpoint(endPointMap.getKey()), endPointMap.getValue());
+    }
+    assertEquals(3, session.endPointToSessionConnection.size());
+    try {
+      session.close();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testInsertTabletsWithSessionBroken() throws StatementExecutionException {
+    // without leader cache
+    session = new MockSession("127.0.0.1", 55560, false);
+    try {
+      session.open();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertNull(session.deviceIdToEndpoint);
+    assertNull(session.endPointToSessionConnection);
+
+    // set the session connection as broken
+    ((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
+    List<String> allDeviceIds =
+        new ArrayList<String>() {
+          {
+            add("root.sg1.d1");
+            add("root.sg2.d1");
+            add("root.sg3.d1");
+            add("root.sg4.d1");
+          }
+        };
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+    Tablet tablet1 = new Tablet(allDeviceIds.get(1), schemaList, 100);
+    Tablet tablet2 = new Tablet(allDeviceIds.get(2), schemaList, 100);
+    Tablet tablet3 = new Tablet(allDeviceIds.get(3), schemaList, 100);
+
+    Map<String, Tablet> tabletMap = new HashMap<>();
+    tabletMap.put(allDeviceIds.get(1), tablet1);
+    tabletMap.put(allDeviceIds.get(2), tablet2);
+    tabletMap.put(allDeviceIds.get(3), tablet3);
+
+    long timestamp = System.currentTimeMillis();
+    for (long row = 0; row < 100; row++) {
+      int row1 = tablet1.rowSize++;
+      int row2 = tablet2.rowSize++;
+      int row3 = tablet3.rowSize++;
+      tablet1.addTimestamp(row1, timestamp);
+      tablet2.addTimestamp(row2, timestamp);
+      tablet3.addTimestamp(row3, timestamp);
+      for (int i = 0; i < 3; i++) {
+        long value = new Random().nextLong();
+        tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
+        tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
+        tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
+      }
+      if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
+        try {
+          session.insertTablets(tabletMap, true);
+        } catch (IoTDBConnectionException e) {
+          assertEquals(
+              "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken",
+              e.getMessage());
+        }
+        tablet1.reset();
+        tablet2.reset();
+        tablet3.reset();
+      }
+      timestamp++;
+    }
+
+    if (tablet1.rowSize != 0) {
+      try {
+        session.insertTablets(tabletMap, true);
+      } catch (IoTDBConnectionException e) {
+        Assert.fail(e.getMessage());
+      }
+
+      tablet1.reset();
+      tablet2.reset();
+      tablet3.reset();
+    }
+
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertNull(session.deviceIdToEndpoint);
+    assertNull(session.endPointToSessionConnection);
+    try {
+      session.close();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // with leader cache
+    // rest the session connection
+    session = new MockSession("127.0.0.1", 55560, true);
+    try {
+      session.open();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertEquals(0, session.deviceIdToEndpoint.size());
+    assertEquals(1, session.endPointToSessionConnection.size());
+
+    for (long row = 0; row < 100; row++) {
+      int row1 = tablet1.rowSize++;
+      int row2 = tablet2.rowSize++;
+      int row3 = tablet3.rowSize++;
+      tablet1.addTimestamp(row1, timestamp);
+      tablet2.addTimestamp(row2, timestamp);
+      tablet3.addTimestamp(row3, timestamp);
+      for (int i = 0; i < 3; i++) {
+        long value = new Random().nextLong();
+        tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
+        tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
+        tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
+      }
+      if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
+        try {
+          session.insertTablets(tabletMap, true);
+        } catch (IoTDBConnectionException e) {
+          Assert.fail(e.getMessage());
+        }
+        tablet1.reset();
+        tablet2.reset();
+        tablet3.reset();
+      }
+      timestamp++;
+    }
+
+    // set the session connection as broken
+    ((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
+    // set connection as broken, due to we enable the cache leader, when we called
+    // ((MockSession) session).getMockSessionConnection(), the session's endpoint have changed to

Review comment:
       ```suggestion
   // ((MockSession) session). getLastConstructedSessionConnection(), the session's endpoint has been changed to
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org