You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/06/08 06:21:17 UTC

[iotdb] branch master updated: Remove the broken cached leader connection & optimize the insertRecords method in session (#3364)

This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 66918f3  Remove the broken cached leader connection & optimize the insertRecords method in session (#3364)
66918f3 is described below

commit 66918f3f8e2cbae03fe2bfd4ecc56c7dadd79cbf
Author: HouliangQi <ne...@163.com>
AuthorDate: Tue Jun 8 14:20:48 2021 +0800

    Remove the broken cached leader connection & optimize the insertRecords method in session (#3364)
    
    Co-authored-by: Potato <TX...@gmail.com>
---
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |   8 +-
 .../java/org/apache/iotdb/session/Session.java     |  78 +++-
 .../apache/iotdb/session/SessionConnection.java    |  14 +-
 .../apache/iotdb/session/SessionCacheLeaderUT.java | 434 ++++++++++++++++++++-
 4 files changed, 499 insertions(+), 35 deletions(-)

diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 5835c5e..d644319 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 import java.lang.reflect.Proxy;
@@ -95,9 +94,8 @@ public class RpcUtils {
     }
   }
 
-  public static void verifySuccessWithRedirectionForInsertTablets(
-      TSStatus status, TSInsertTabletsReq req)
-      throws StatementExecutionException, RedirectException {
+  public static void verifySuccessWithRedirectionForMultiDevices(
+      TSStatus status, List<String> devices) throws StatementExecutionException, RedirectException {
     verifySuccess(status);
     if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
       Map<String, EndPoint> deviceEndPointMap = new HashMap<>();
@@ -105,7 +103,7 @@ public class RpcUtils {
       for (int i = 0; i < statusSubStatus.size(); i++) {
         TSStatus subStatus = statusSubStatus.get(i);
         if (subStatus.isSetRedirectNode()) {
-          deviceEndPointMap.put(req.getDeviceIds().get(i), subStatus.getRedirectNode());
+          deviceEndPointMap.put(devices.get(i), subStatus.getRedirectNode());
         }
       }
       throw new RedirectException(deviceEndPointMap);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 41bcd2f..447c2b5 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -59,6 +59,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -685,6 +686,32 @@ public class Session {
     }
   }
 
+  // TODO https://issues.apache.org/jira/browse/IOTDB-1399
+  private void removeBrokenSessionConnection(SessionConnection sessionConnection) {
+    // remove the cached broken leader session
+    if (enableCacheLeader) {
+      EndPoint endPoint = null;
+      for (Iterator<Entry<EndPoint, SessionConnection>> it =
+              endPointToSessionConnection.entrySet().iterator();
+          it.hasNext(); ) {
+        Map.Entry<EndPoint, SessionConnection> entry = it.next();
+        if (entry.getValue().equals(sessionConnection)) {
+          endPoint = entry.getKey();
+          it.remove();
+          break;
+        }
+      }
+
+      for (Iterator<Entry<String, EndPoint>> it = deviceIdToEndpoint.entrySet().iterator();
+          it.hasNext(); ) {
+        Map.Entry<String, EndPoint> entry = it.next();
+        if (entry.getValue().equals(endPoint)) {
+          it.remove();
+        }
+      }
+    }
+  }
+
   private void handleMetaRedirection(String storageGroup, RedirectException e)
       throws IoTDBConnectionException {
     if (enableCacheLeader) {
@@ -867,22 +894,36 @@ public class Session {
       List<List<String>> measurementsList,
       List<List<String>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    Map<String, TSInsertStringRecordsReq> deviceGroup = new HashMap<>();
+    Map<SessionConnection, TSInsertStringRecordsReq> recordsGroup = new HashMap<>();
+    EndPoint endPoint;
+    SessionConnection connection;
     for (int i = 0; i < deviceIds.size(); i++) {
+      endPoint = deviceIdToEndpoint.get(deviceIds.get(i));
+      if (endPoint != null) {
+        connection = endPointToSessionConnection.get(endPoint);
+      } else {
+        connection = defaultSessionConnection;
+      }
       TSInsertStringRecordsReq request =
-          deviceGroup.computeIfAbsent(deviceIds.get(i), k -> new TSInsertStringRecordsReq());
+          recordsGroup.computeIfAbsent(connection, k -> new TSInsertStringRecordsReq());
       updateTSInsertStringRecordsReq(
           request, deviceIds.get(i), times.get(i), measurementsList.get(i), valuesList.get(i));
     }
     // TODO parallel
     StringBuilder errMsgBuilder = new StringBuilder();
-    for (Entry<String, TSInsertStringRecordsReq> entry : deviceGroup.entrySet()) {
+    for (Entry<SessionConnection, TSInsertStringRecordsReq> entry : recordsGroup.entrySet()) {
       try {
-        getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
+        entry.getKey().insertRecords(entry.getValue());
       } catch (RedirectException e) {
-        handleRedirection(entry.getKey(), e.getEndPoint());
+        for (Entry<String, EndPoint> deviceEndPointEntry : e.getDeviceEndPointMap().entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+        }
       } catch (StatementExecutionException e) {
         errMsgBuilder.append(e.getMessage());
+      } catch (IoTDBConnectionException e) {
+        // remove the broken session
+        removeBrokenSessionConnection(entry.getKey());
+        throw e;
       }
     }
     String errMsg = errMsgBuilder.toString();
@@ -1075,10 +1116,18 @@ public class Session {
       List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    Map<String, TSInsertRecordsReq> deviceGroup = new HashMap<>();
+    Map<SessionConnection, TSInsertRecordsReq> recordsGroup = new HashMap<>();
+    EndPoint endPoint;
+    SessionConnection connection;
     for (int i = 0; i < deviceIds.size(); i++) {
+      endPoint = deviceIdToEndpoint.get(deviceIds.get(i));
+      if (endPoint != null) {
+        connection = endPointToSessionConnection.get(endPoint);
+      } else {
+        connection = defaultSessionConnection;
+      }
       TSInsertRecordsReq request =
-          deviceGroup.computeIfAbsent(deviceIds.get(i), k -> new TSInsertRecordsReq());
+          recordsGroup.computeIfAbsent(connection, k -> new TSInsertRecordsReq());
       updateTSInsertRecordsReq(
           request,
           deviceIds.get(i),
@@ -1089,13 +1138,19 @@ public class Session {
     }
     // TODO parallel
     StringBuilder errMsgBuilder = new StringBuilder();
-    for (Entry<String, TSInsertRecordsReq> entry : deviceGroup.entrySet()) {
+    for (Entry<SessionConnection, TSInsertRecordsReq> entry : recordsGroup.entrySet()) {
       try {
-        getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
+        entry.getKey().insertRecords(entry.getValue());
       } catch (RedirectException e) {
-        handleRedirection(entry.getKey(), e.getEndPoint());
+        for (Entry<String, EndPoint> deviceEndPointEntry : e.getDeviceEndPointMap().entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+        }
       } catch (StatementExecutionException e) {
         errMsgBuilder.append(e.getMessage());
+      } catch (IoTDBConnectionException e) {
+        // remove the broken session
+        removeBrokenSessionConnection(entry.getKey());
+        throw e;
       }
     }
     String errMsg = errMsgBuilder.toString();
@@ -1280,6 +1335,9 @@ public class Session {
         }
       } catch (StatementExecutionException e) {
         errMsgBuilder.append(e.getMessage());
+      } catch (IoTDBConnectionException e) {
+        removeBrokenSessionConnection(entry.getKey());
+        throw e;
       }
     }
     String errMsg = errMsgBuilder.toString();
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 57f91ca..387902b 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -475,7 +475,8 @@ public class SessionConnection {
       throws IoTDBConnectionException, StatementExecutionException, RedirectException {
     request.setSessionId(sessionId);
     try {
-      RpcUtils.verifySuccessWithRedirection(client.insertRecords(request));
+      RpcUtils.verifySuccessWithRedirectionForMultiDevices(
+          client.insertRecords(request), request.getDeviceIds());
     } catch (TException e) {
       if (reconnect()) {
         try {
@@ -494,7 +495,8 @@ public class SessionConnection {
       throws IoTDBConnectionException, StatementExecutionException, RedirectException {
     request.setSessionId(sessionId);
     try {
-      RpcUtils.verifySuccessWithRedirection(client.insertStringRecords(request));
+      RpcUtils.verifySuccessWithRedirectionForMultiDevices(
+          client.insertStringRecords(request), request.getDeviceIds());
     } catch (TException e) {
       if (reconnect()) {
         try {
@@ -551,7 +553,8 @@ public class SessionConnection {
       throws IoTDBConnectionException, StatementExecutionException, RedirectException {
     request.setSessionId(sessionId);
     try {
-      RpcUtils.verifySuccessWithRedirectionForInsertTablets(client.insertTablets(request), request);
+      RpcUtils.verifySuccessWithRedirectionForMultiDevices(
+          client.insertTablets(request), request.getDeviceIds());
     } catch (TException e) {
       if (reconnect()) {
         try {
@@ -790,4 +793,9 @@ public class SessionConnection {
   public void setEndPoint(EndPoint endPoint) {
     this.endPoint = endPoint;
   }
+
+  @Override
+  public String toString() {
+    return "SessionConnection{" + " endPoint=" + endPoint + "}";
+  }
 }
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java b/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
index 7e7f4e6..47e3367 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.time.ZoneId;
@@ -47,6 +48,7 @@ import java.util.Random;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 public class SessionCacheLeaderUT {
 
@@ -309,6 +311,7 @@ public class SessionCacheLeaderUT {
         timestamps.clear();
       }
     }
+
     session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
     deviceIds.clear();
     measurementsList.clear();
@@ -709,6 +712,338 @@ public class SessionCacheLeaderUT {
     session.close();
   }
 
+  @Test
+  public void testInsertRecordsWithSessionBroken() throws StatementExecutionException {
+    // without leader cache
+    session = new MockSession("127.0.0.1", 55560, false);
+    try {
+      session.open();
+    } catch (IoTDBConnectionException e) {
+      fail(e.getMessage());
+    }
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertNull(session.deviceIdToEndpoint);
+    assertNull(session.endPointToSessionConnection);
+    ((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
+
+    List<String> allDeviceIds =
+        new ArrayList<String>() {
+          {
+            add("root.sg1.d1");
+            add("root.sg2.d1");
+            add("root.sg3.d1");
+            add("root.sg4.d1");
+          }
+        };
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    measurements.add("s3");
+
+    List<String> deviceIds = new ArrayList<>();
+    List<List<String>> measurementsList = new ArrayList<>();
+    List<List<Object>> valuesList = new ArrayList<>();
+    List<Long> timestamps = new ArrayList<>();
+    List<List<TSDataType>> typesList = new ArrayList<>();
+
+    for (long time = 0; time < 500; time++) {
+      List<Object> values = new ArrayList<>();
+      List<TSDataType> types = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      deviceIds.add(allDeviceIds.get((int) (time % allDeviceIds.size())));
+      measurementsList.add(measurements);
+      valuesList.add(values);
+      typesList.add(types);
+      timestamps.add(time);
+
+      if (time != 0 && time % 100 == 0) {
+        try {
+          session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+        } catch (IoTDBConnectionException e) {
+          Assert.assertEquals(
+              "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken",
+              e.getMessage());
+        }
+        deviceIds.clear();
+        measurementsList.clear();
+        valuesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    try {
+      session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+    } catch (IoTDBConnectionException e) {
+      Assert.assertEquals(
+          "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken", e.getMessage());
+    }
+    deviceIds.clear();
+    measurementsList.clear();
+    valuesList.clear();
+    timestamps.clear();
+
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertNull(session.deviceIdToEndpoint);
+    assertNull(session.endPointToSessionConnection);
+    try {
+      session.close();
+    } catch (IoTDBConnectionException e) {
+      Assert.assertEquals(
+          "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken", e.getMessage());
+    }
+
+    // with leader cache
+    // reset connection
+    session = new MockSession("127.0.0.1", 55560, true);
+    try {
+      session.open();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertEquals(0, session.deviceIdToEndpoint.size());
+    assertEquals(1, session.endPointToSessionConnection.size());
+    for (long time = 0; time < 500; time++) {
+      List<Object> values = new ArrayList<>();
+      List<TSDataType> types = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      deviceIds.add(allDeviceIds.get((int) (time % allDeviceIds.size())));
+      measurementsList.add(measurements);
+      valuesList.add(values);
+      typesList.add(types);
+      timestamps.add(time);
+      if (time != 0 && time % 100 == 0) {
+        try {
+          session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+        } catch (IoTDBConnectionException e) {
+          Assert.fail(e.getMessage());
+        }
+        deviceIds.clear();
+        measurementsList.clear();
+        valuesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    // set connection as broken, due to we enable the cache leader, when we called
+    // ((MockSession) session).getLastConstructedSessionConnection(), the session's endpoint has
+    // been changed to EndPoint(ip:127.0.0.1, port:55562)
+    Assert.assertEquals(
+        "MockSessionConnection{ endPoint=EndPoint(ip:127.0.0.1, port:55562)}",
+        ((MockSession) session).getLastConstructedSessionConnection().toString());
+    ((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
+    try {
+      session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+    } catch (IoTDBConnectionException e) {
+      Assert.assertEquals(
+          "the session connection = EndPoint(ip:127.0.0.1, port:55562) is broken", e.getMessage());
+    }
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertEquals(3, session.deviceIdToEndpoint.size());
+    for (Map.Entry<String, EndPoint> endPointMap : session.deviceIdToEndpoint.entrySet()) {
+      assertEquals(getDeviceIdBelongedEndpoint(endPointMap.getKey()), endPointMap.getValue());
+    }
+    assertEquals(3, session.endPointToSessionConnection.size());
+    try {
+      session.close();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testInsertTabletsWithSessionBroken() throws StatementExecutionException {
+    // without leader cache
+    session = new MockSession("127.0.0.1", 55560, false);
+    try {
+      session.open();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertNull(session.deviceIdToEndpoint);
+    assertNull(session.endPointToSessionConnection);
+
+    // set the session connection as broken
+    ((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
+    List<String> allDeviceIds =
+        new ArrayList<String>() {
+          {
+            add("root.sg1.d1");
+            add("root.sg2.d1");
+            add("root.sg3.d1");
+            add("root.sg4.d1");
+          }
+        };
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+    Tablet tablet1 = new Tablet(allDeviceIds.get(1), schemaList, 100);
+    Tablet tablet2 = new Tablet(allDeviceIds.get(2), schemaList, 100);
+    Tablet tablet3 = new Tablet(allDeviceIds.get(3), schemaList, 100);
+
+    Map<String, Tablet> tabletMap = new HashMap<>();
+    tabletMap.put(allDeviceIds.get(1), tablet1);
+    tabletMap.put(allDeviceIds.get(2), tablet2);
+    tabletMap.put(allDeviceIds.get(3), tablet3);
+
+    long timestamp = System.currentTimeMillis();
+    for (long row = 0; row < 100; row++) {
+      int row1 = tablet1.rowSize++;
+      int row2 = tablet2.rowSize++;
+      int row3 = tablet3.rowSize++;
+      tablet1.addTimestamp(row1, timestamp);
+      tablet2.addTimestamp(row2, timestamp);
+      tablet3.addTimestamp(row3, timestamp);
+      for (int i = 0; i < 3; i++) {
+        long value = new Random().nextLong();
+        tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
+        tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
+        tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
+      }
+      if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
+        try {
+          session.insertTablets(tabletMap, true);
+        } catch (IoTDBConnectionException e) {
+          assertEquals(
+              "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken",
+              e.getMessage());
+        }
+        tablet1.reset();
+        tablet2.reset();
+        tablet3.reset();
+      }
+      timestamp++;
+    }
+
+    if (tablet1.rowSize != 0) {
+      try {
+        session.insertTablets(tabletMap, true);
+      } catch (IoTDBConnectionException e) {
+        Assert.fail(e.getMessage());
+      }
+
+      tablet1.reset();
+      tablet2.reset();
+      tablet3.reset();
+    }
+
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertNull(session.deviceIdToEndpoint);
+    assertNull(session.endPointToSessionConnection);
+    try {
+      session.close();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // with leader cache
+    // rest the session connection
+    session = new MockSession("127.0.0.1", 55560, true);
+    try {
+      session.open();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertEquals(0, session.deviceIdToEndpoint.size());
+    assertEquals(1, session.endPointToSessionConnection.size());
+
+    for (long row = 0; row < 100; row++) {
+      int row1 = tablet1.rowSize++;
+      int row2 = tablet2.rowSize++;
+      int row3 = tablet3.rowSize++;
+      tablet1.addTimestamp(row1, timestamp);
+      tablet2.addTimestamp(row2, timestamp);
+      tablet3.addTimestamp(row3, timestamp);
+      for (int i = 0; i < 3; i++) {
+        long value = new Random().nextLong();
+        tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
+        tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
+        tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
+      }
+      if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
+        try {
+          session.insertTablets(tabletMap, true);
+        } catch (IoTDBConnectionException e) {
+          Assert.fail(e.getMessage());
+        }
+        tablet1.reset();
+        tablet2.reset();
+        tablet3.reset();
+      }
+      timestamp++;
+    }
+
+    // set the session connection as broken
+    ((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
+    // set connection as broken, due to we enable the cache leader, when we called
+    // ((MockSession) session).getLastConstructedSessionConnection(), the session's endpoint has
+    // been changed to EndPoint(ip:127.0.0.1, port:55562)
+    Assert.assertEquals(
+        "MockSessionConnection{ endPoint=EndPoint(ip:127.0.0.1, port:55562)}",
+        ((MockSession) session).getLastConstructedSessionConnection().toString());
+
+    for (long row = 0; row < 10; row++) {
+      int row1 = tablet1.rowSize++;
+      int row2 = tablet2.rowSize++;
+      int row3 = tablet3.rowSize++;
+      tablet1.addTimestamp(row1, timestamp);
+      tablet2.addTimestamp(row2, timestamp);
+      tablet3.addTimestamp(row3, timestamp);
+      for (int i = 0; i < 3; i++) {
+        long value = new Random().nextLong();
+        tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
+        tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
+        tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
+      }
+      if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
+        try {
+          session.insertTablets(tabletMap, true);
+        } catch (IoTDBConnectionException e) {
+          Assert.fail(e.getMessage());
+        }
+        tablet1.reset();
+        tablet2.reset();
+        tablet3.reset();
+      }
+      timestamp++;
+    }
+    try {
+      session.insertTablets(tabletMap, true);
+    } catch (IoTDBConnectionException e) {
+      Assert.assertEquals(
+          "the session connection = EndPoint(ip:127.0.0.1, port:55562) is broken", e.getMessage());
+    }
+    tablet1.reset();
+    tablet2.reset();
+    tablet3.reset();
+
+    assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+    assertEquals(2, session.deviceIdToEndpoint.size());
+    for (Map.Entry<String, EndPoint> endPointEntry : session.deviceIdToEndpoint.entrySet()) {
+      assertEquals(getDeviceIdBelongedEndpoint(endPointEntry.getKey()), endPointEntry.getValue());
+    }
+    assertEquals(3, session.endPointToSessionConnection.size());
+    try {
+      session.close();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
   private void addLine(
       List<Long> times,
       List<List<String>> measurements,
@@ -738,6 +1073,8 @@ public class SessionCacheLeaderUT {
 
   static class MockSession extends Session {
 
+    private MockSessionConnection lastConstructedSessionConnection;
+
     public MockSession(String host, int rpcPort, boolean enableCacheLeader) {
       super(
           host,
@@ -754,69 +1091,132 @@ public class SessionCacheLeaderUT {
     @Override
     public SessionConnection constructSessionConnection(
         Session session, EndPoint endpoint, ZoneId zoneId) {
-      return new MockSessionConnection(session, endpoint, zoneId);
+      lastConstructedSessionConnection = new MockSessionConnection(session, endpoint, zoneId);
+      return lastConstructedSessionConnection;
+    }
+
+    public MockSessionConnection getLastConstructedSessionConnection() {
+      return lastConstructedSessionConnection;
     }
   }
 
   static class MockSessionConnection extends SessionConnection {
 
+    private EndPoint endPoint;
+    private boolean connectionBroken;
+    private IoTDBConnectionException ioTDBConnectionException;
+
     public MockSessionConnection(Session session, EndPoint endPoint, ZoneId zoneId) {
       super();
+      this.endPoint = endPoint;
+      ioTDBConnectionException =
+          new IoTDBConnectionException(
+              String.format("the session connection = %s is broken", endPoint.toString()));
     }
 
     @Override
     public void close() {}
 
     @Override
-    protected void setStorageGroup(String storageGroup) throws RedirectException {
+    protected void setStorageGroup(String storageGroup)
+        throws RedirectException, IoTDBConnectionException {
+      if (isConnectionBroken()) {
+        throw ioTDBConnectionException;
+      }
       throw new RedirectException(endpoints.get(1));
     }
 
     @Override
-    protected void deleteStorageGroups(List<String> storageGroups) throws RedirectException {
+    protected void deleteStorageGroups(List<String> storageGroups)
+        throws RedirectException, IoTDBConnectionException {
+      if (isConnectionBroken()) {
+        throw ioTDBConnectionException;
+      }
       throw new RedirectException(endpoints.get(1));
     }
 
     @Override
-    protected void insertRecord(TSInsertRecordReq request) throws RedirectException {
+    protected void insertRecord(TSInsertRecordReq request)
+        throws RedirectException, IoTDBConnectionException {
+      if (isConnectionBroken()) {
+        throw ioTDBConnectionException;
+      }
       throw new RedirectException(getDeviceIdBelongedEndpoint(request.prefixPath));
     }
 
     @Override
-    protected void insertRecord(TSInsertStringRecordReq request) throws RedirectException {
+    protected void insertRecord(TSInsertStringRecordReq request)
+        throws RedirectException, IoTDBConnectionException {
+      if (isConnectionBroken()) {
+        throw ioTDBConnectionException;
+      }
       throw new RedirectException(getDeviceIdBelongedEndpoint(request.deviceId));
     }
 
     @Override
-    protected void insertRecords(TSInsertRecordsReq request) throws RedirectException {
-      throw new RedirectException(getDeviceIdBelongedEndpoint(request.deviceIds.get(0)));
+    protected void insertRecords(TSInsertRecordsReq request)
+        throws RedirectException, IoTDBConnectionException {
+      if (isConnectionBroken()) {
+        throw ioTDBConnectionException;
+      }
+      throw getRedirectException(request.getDeviceIds());
     }
 
     @Override
-    protected void insertRecords(TSInsertStringRecordsReq request) throws RedirectException {
-      throw new RedirectException(getDeviceIdBelongedEndpoint(request.deviceIds.get(0)));
+    protected void insertRecords(TSInsertStringRecordsReq request)
+        throws RedirectException, IoTDBConnectionException {
+      if (isConnectionBroken()) {
+        throw ioTDBConnectionException;
+      }
+      throw getRedirectException(request.getDeviceIds());
     }
 
     @Override
     protected void insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq request)
-        throws RedirectException {
+        throws RedirectException, IoTDBConnectionException {
+      if (isConnectionBroken()) {
+        throw ioTDBConnectionException;
+      }
       throw new RedirectException(getDeviceIdBelongedEndpoint(request.deviceId));
     }
 
     @Override
-    protected void insertTablet(TSInsertTabletReq request) throws RedirectException {
+    protected void insertTablet(TSInsertTabletReq request)
+        throws RedirectException, IoTDBConnectionException {
+      if (isConnectionBroken()) {
+        throw ioTDBConnectionException;
+      }
       throw new RedirectException(getDeviceIdBelongedEndpoint(request.prefixPath));
     }
 
     @Override
-    protected void insertTablets(TSInsertTabletsReq request) throws RedirectException {
+    protected void insertTablets(TSInsertTabletsReq request)
+        throws RedirectException, IoTDBConnectionException {
+      if (isConnectionBroken()) {
+        throw ioTDBConnectionException;
+      }
+      throw getRedirectException(request.getDeviceIds());
+    }
+
+    private RedirectException getRedirectException(List<String> deviceIds) {
       Map<String, EndPoint> deviceEndPointMap = new HashMap<>();
-      for (int i = 0; i < request.getDeviceIds().size(); i++) {
-        deviceEndPointMap.put(
-            request.getDeviceIds().get(i),
-            getDeviceIdBelongedEndpoint(request.getDeviceIds().get(i)));
+      for (String deviceId : deviceIds) {
+        deviceEndPointMap.put(deviceId, getDeviceIdBelongedEndpoint(deviceId));
       }
-      throw new RedirectException(deviceEndPointMap);
+      return new RedirectException(deviceEndPointMap);
+    }
+
+    public boolean isConnectionBroken() {
+      return connectionBroken;
+    }
+
+    public void setConnectionBroken(boolean connectionBroken) {
+      this.connectionBroken = connectionBroken;
+    }
+
+    @Override
+    public String toString() {
+      return "MockSessionConnection{" + " endPoint=" + endPoint + "}";
     }
   }
 }