You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/06/08 06:21:17 UTC
[iotdb] branch master updated: Remove the broken cached leader
connection & optimize the insertRecords method in session (#3364)
This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 66918f3 Remove the broken cached leader connection & optimize the insertRecords method in session (#3364)
66918f3 is described below
commit 66918f3f8e2cbae03fe2bfd4ecc56c7dadd79cbf
Author: HouliangQi <ne...@163.com>
AuthorDate: Tue Jun 8 14:20:48 2021 +0800
Remove the broken cached leader connection & optimize the insertRecords method in session (#3364)
Co-authored-by: Potato <TX...@gmail.com>
---
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 8 +-
.../java/org/apache/iotdb/session/Session.java | 78 +++-
.../apache/iotdb/session/SessionConnection.java | 14 +-
.../apache/iotdb/session/SessionCacheLeaderUT.java | 434 ++++++++++++++++++++-
4 files changed, 499 insertions(+), 35 deletions(-)
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 5835c5e..d644319 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import java.lang.reflect.Proxy;
@@ -95,9 +94,8 @@ public class RpcUtils {
}
}
- public static void verifySuccessWithRedirectionForInsertTablets(
- TSStatus status, TSInsertTabletsReq req)
- throws StatementExecutionException, RedirectException {
+ public static void verifySuccessWithRedirectionForMultiDevices(
+ TSStatus status, List<String> devices) throws StatementExecutionException, RedirectException {
verifySuccess(status);
if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
Map<String, EndPoint> deviceEndPointMap = new HashMap<>();
@@ -105,7 +103,7 @@ public class RpcUtils {
for (int i = 0; i < statusSubStatus.size(); i++) {
TSStatus subStatus = statusSubStatus.get(i);
if (subStatus.isSetRedirectNode()) {
- deviceEndPointMap.put(req.getDeviceIds().get(i), subStatus.getRedirectNode());
+ deviceEndPointMap.put(devices.get(i), subStatus.getRedirectNode());
}
}
throw new RedirectException(deviceEndPointMap);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 41bcd2f..447c2b5 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -59,6 +59,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -685,6 +686,32 @@ public class Session {
}
}
+ // TODO https://issues.apache.org/jira/browse/IOTDB-1399
+ private void removeBrokenSessionConnection(SessionConnection sessionConnection) {
+ // remove the cached broken leader session
+ if (enableCacheLeader) {
+ EndPoint endPoint = null;
+ for (Iterator<Entry<EndPoint, SessionConnection>> it =
+ endPointToSessionConnection.entrySet().iterator();
+ it.hasNext(); ) {
+ Map.Entry<EndPoint, SessionConnection> entry = it.next();
+ if (entry.getValue().equals(sessionConnection)) {
+ endPoint = entry.getKey();
+ it.remove();
+ break;
+ }
+ }
+
+ for (Iterator<Entry<String, EndPoint>> it = deviceIdToEndpoint.entrySet().iterator();
+ it.hasNext(); ) {
+ Map.Entry<String, EndPoint> entry = it.next();
+ if (entry.getValue().equals(endPoint)) {
+ it.remove();
+ }
+ }
+ }
+ }
+
private void handleMetaRedirection(String storageGroup, RedirectException e)
throws IoTDBConnectionException {
if (enableCacheLeader) {
@@ -867,22 +894,36 @@ public class Session {
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- Map<String, TSInsertStringRecordsReq> deviceGroup = new HashMap<>();
+ Map<SessionConnection, TSInsertStringRecordsReq> recordsGroup = new HashMap<>();
+ EndPoint endPoint;
+ SessionConnection connection;
for (int i = 0; i < deviceIds.size(); i++) {
+ endPoint = deviceIdToEndpoint.get(deviceIds.get(i));
+ if (endPoint != null) {
+ connection = endPointToSessionConnection.get(endPoint);
+ } else {
+ connection = defaultSessionConnection;
+ }
TSInsertStringRecordsReq request =
- deviceGroup.computeIfAbsent(deviceIds.get(i), k -> new TSInsertStringRecordsReq());
+ recordsGroup.computeIfAbsent(connection, k -> new TSInsertStringRecordsReq());
updateTSInsertStringRecordsReq(
request, deviceIds.get(i), times.get(i), measurementsList.get(i), valuesList.get(i));
}
// TODO parallel
StringBuilder errMsgBuilder = new StringBuilder();
- for (Entry<String, TSInsertStringRecordsReq> entry : deviceGroup.entrySet()) {
+ for (Entry<SessionConnection, TSInsertStringRecordsReq> entry : recordsGroup.entrySet()) {
try {
- getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
+ entry.getKey().insertRecords(entry.getValue());
} catch (RedirectException e) {
- handleRedirection(entry.getKey(), e.getEndPoint());
+ for (Entry<String, EndPoint> deviceEndPointEntry : e.getDeviceEndPointMap().entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+ }
} catch (StatementExecutionException e) {
errMsgBuilder.append(e.getMessage());
+ } catch (IoTDBConnectionException e) {
+ // remove the broken session
+ removeBrokenSessionConnection(entry.getKey());
+ throw e;
}
}
String errMsg = errMsgBuilder.toString();
@@ -1075,10 +1116,18 @@ public class Session {
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- Map<String, TSInsertRecordsReq> deviceGroup = new HashMap<>();
+ Map<SessionConnection, TSInsertRecordsReq> recordsGroup = new HashMap<>();
+ EndPoint endPoint;
+ SessionConnection connection;
for (int i = 0; i < deviceIds.size(); i++) {
+ endPoint = deviceIdToEndpoint.get(deviceIds.get(i));
+ if (endPoint != null) {
+ connection = endPointToSessionConnection.get(endPoint);
+ } else {
+ connection = defaultSessionConnection;
+ }
TSInsertRecordsReq request =
- deviceGroup.computeIfAbsent(deviceIds.get(i), k -> new TSInsertRecordsReq());
+ recordsGroup.computeIfAbsent(connection, k -> new TSInsertRecordsReq());
updateTSInsertRecordsReq(
request,
deviceIds.get(i),
@@ -1089,13 +1138,19 @@ public class Session {
}
// TODO parallel
StringBuilder errMsgBuilder = new StringBuilder();
- for (Entry<String, TSInsertRecordsReq> entry : deviceGroup.entrySet()) {
+ for (Entry<SessionConnection, TSInsertRecordsReq> entry : recordsGroup.entrySet()) {
try {
- getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
+ entry.getKey().insertRecords(entry.getValue());
} catch (RedirectException e) {
- handleRedirection(entry.getKey(), e.getEndPoint());
+ for (Entry<String, EndPoint> deviceEndPointEntry : e.getDeviceEndPointMap().entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+ }
} catch (StatementExecutionException e) {
errMsgBuilder.append(e.getMessage());
+ } catch (IoTDBConnectionException e) {
+ // remove the broken session
+ removeBrokenSessionConnection(entry.getKey());
+ throw e;
}
}
String errMsg = errMsgBuilder.toString();
@@ -1280,6 +1335,9 @@ public class Session {
}
} catch (StatementExecutionException e) {
errMsgBuilder.append(e.getMessage());
+ } catch (IoTDBConnectionException e) {
+ removeBrokenSessionConnection(entry.getKey());
+ throw e;
}
}
String errMsg = errMsgBuilder.toString();
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 57f91ca..387902b 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -475,7 +475,8 @@ public class SessionConnection {
throws IoTDBConnectionException, StatementExecutionException, RedirectException {
request.setSessionId(sessionId);
try {
- RpcUtils.verifySuccessWithRedirection(client.insertRecords(request));
+ RpcUtils.verifySuccessWithRedirectionForMultiDevices(
+ client.insertRecords(request), request.getDeviceIds());
} catch (TException e) {
if (reconnect()) {
try {
@@ -494,7 +495,8 @@ public class SessionConnection {
throws IoTDBConnectionException, StatementExecutionException, RedirectException {
request.setSessionId(sessionId);
try {
- RpcUtils.verifySuccessWithRedirection(client.insertStringRecords(request));
+ RpcUtils.verifySuccessWithRedirectionForMultiDevices(
+ client.insertStringRecords(request), request.getDeviceIds());
} catch (TException e) {
if (reconnect()) {
try {
@@ -551,7 +553,8 @@ public class SessionConnection {
throws IoTDBConnectionException, StatementExecutionException, RedirectException {
request.setSessionId(sessionId);
try {
- RpcUtils.verifySuccessWithRedirectionForInsertTablets(client.insertTablets(request), request);
+ RpcUtils.verifySuccessWithRedirectionForMultiDevices(
+ client.insertTablets(request), request.getDeviceIds());
} catch (TException e) {
if (reconnect()) {
try {
@@ -790,4 +793,9 @@ public class SessionConnection {
public void setEndPoint(EndPoint endPoint) {
this.endPoint = endPoint;
}
+
+ @Override
+ public String toString() {
+ return "SessionConnection{" + " endPoint=" + endPoint + "}";
+ }
}
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java b/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
index 7e7f4e6..47e3367 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
import org.junit.Test;
import java.time.ZoneId;
@@ -47,6 +48,7 @@ import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
public class SessionCacheLeaderUT {
@@ -309,6 +311,7 @@ public class SessionCacheLeaderUT {
timestamps.clear();
}
}
+
session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
deviceIds.clear();
measurementsList.clear();
@@ -709,6 +712,338 @@ public class SessionCacheLeaderUT {
session.close();
}
+ @Test
+ public void testInsertRecordsWithSessionBroken() throws StatementExecutionException {
+ // without leader cache
+ session = new MockSession("127.0.0.1", 55560, false);
+ try {
+ session.open();
+ } catch (IoTDBConnectionException e) {
+ fail(e.getMessage());
+ }
+ assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+ assertNull(session.deviceIdToEndpoint);
+ assertNull(session.endPointToSessionConnection);
+ ((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
+
+ List<String> allDeviceIds =
+ new ArrayList<String>() {
+ {
+ add("root.sg1.d1");
+ add("root.sg2.d1");
+ add("root.sg3.d1");
+ add("root.sg4.d1");
+ }
+ };
+ List<String> measurements = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ measurements.add("s3");
+
+ List<String> deviceIds = new ArrayList<>();
+ List<List<String>> measurementsList = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+ List<Long> timestamps = new ArrayList<>();
+ List<List<TSDataType>> typesList = new ArrayList<>();
+
+ for (long time = 0; time < 500; time++) {
+ List<Object> values = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ values.add(1L);
+ values.add(2L);
+ values.add(3L);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ deviceIds.add(allDeviceIds.get((int) (time % allDeviceIds.size())));
+ measurementsList.add(measurements);
+ valuesList.add(values);
+ typesList.add(types);
+ timestamps.add(time);
+
+ if (time != 0 && time % 100 == 0) {
+ try {
+ session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+ } catch (IoTDBConnectionException e) {
+ Assert.assertEquals(
+ "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken",
+ e.getMessage());
+ }
+ deviceIds.clear();
+ measurementsList.clear();
+ valuesList.clear();
+ timestamps.clear();
+ }
+ }
+
+ try {
+ session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+ } catch (IoTDBConnectionException e) {
+ Assert.assertEquals(
+ "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken", e.getMessage());
+ }
+ deviceIds.clear();
+ measurementsList.clear();
+ valuesList.clear();
+ timestamps.clear();
+
+ assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+ assertNull(session.deviceIdToEndpoint);
+ assertNull(session.endPointToSessionConnection);
+ try {
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ Assert.assertEquals(
+ "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken", e.getMessage());
+ }
+
+ // with leader cache
+ // reset connection
+ session = new MockSession("127.0.0.1", 55560, true);
+ try {
+ session.open();
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+ assertEquals(0, session.deviceIdToEndpoint.size());
+ assertEquals(1, session.endPointToSessionConnection.size());
+ for (long time = 0; time < 500; time++) {
+ List<Object> values = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ values.add(1L);
+ values.add(2L);
+ values.add(3L);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ deviceIds.add(allDeviceIds.get((int) (time % allDeviceIds.size())));
+ measurementsList.add(measurements);
+ valuesList.add(values);
+ typesList.add(types);
+ timestamps.add(time);
+ if (time != 0 && time % 100 == 0) {
+ try {
+ session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ deviceIds.clear();
+ measurementsList.clear();
+ valuesList.clear();
+ timestamps.clear();
+ }
+ }
+
+ // set connection as broken, due to we enable the cache leader, when we called
+ // ((MockSession) session).getLastConstructedSessionConnection(), the session's endpoint has
+ // been changed to EndPoint(ip:127.0.0.1, port:55562)
+ Assert.assertEquals(
+ "MockSessionConnection{ endPoint=EndPoint(ip:127.0.0.1, port:55562)}",
+ ((MockSession) session).getLastConstructedSessionConnection().toString());
+ ((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
+ try {
+ session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+ } catch (IoTDBConnectionException e) {
+ Assert.assertEquals(
+ "the session connection = EndPoint(ip:127.0.0.1, port:55562) is broken", e.getMessage());
+ }
+ assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+ assertEquals(3, session.deviceIdToEndpoint.size());
+ for (Map.Entry<String, EndPoint> endPointMap : session.deviceIdToEndpoint.entrySet()) {
+ assertEquals(getDeviceIdBelongedEndpoint(endPointMap.getKey()), endPointMap.getValue());
+ }
+ assertEquals(3, session.endPointToSessionConnection.size());
+ try {
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInsertTabletsWithSessionBroken() throws StatementExecutionException {
+ // without leader cache
+ session = new MockSession("127.0.0.1", 55560, false);
+ try {
+ session.open();
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+ assertNull(session.deviceIdToEndpoint);
+ assertNull(session.endPointToSessionConnection);
+
+ // set the session connection as broken
+ ((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
+ List<String> allDeviceIds =
+ new ArrayList<String>() {
+ {
+ add("root.sg1.d1");
+ add("root.sg2.d1");
+ add("root.sg3.d1");
+ add("root.sg4.d1");
+ }
+ };
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+ Tablet tablet1 = new Tablet(allDeviceIds.get(1), schemaList, 100);
+ Tablet tablet2 = new Tablet(allDeviceIds.get(2), schemaList, 100);
+ Tablet tablet3 = new Tablet(allDeviceIds.get(3), schemaList, 100);
+
+ Map<String, Tablet> tabletMap = new HashMap<>();
+ tabletMap.put(allDeviceIds.get(1), tablet1);
+ tabletMap.put(allDeviceIds.get(2), tablet2);
+ tabletMap.put(allDeviceIds.get(3), tablet3);
+
+ long timestamp = System.currentTimeMillis();
+ for (long row = 0; row < 100; row++) {
+ int row1 = tablet1.rowSize++;
+ int row2 = tablet2.rowSize++;
+ int row3 = tablet3.rowSize++;
+ tablet1.addTimestamp(row1, timestamp);
+ tablet2.addTimestamp(row2, timestamp);
+ tablet3.addTimestamp(row3, timestamp);
+ for (int i = 0; i < 3; i++) {
+ long value = new Random().nextLong();
+ tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
+ tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
+ tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
+ }
+ if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
+ try {
+ session.insertTablets(tabletMap, true);
+ } catch (IoTDBConnectionException e) {
+ assertEquals(
+ "the session connection = EndPoint(ip:127.0.0.1, port:55560) is broken",
+ e.getMessage());
+ }
+ tablet1.reset();
+ tablet2.reset();
+ tablet3.reset();
+ }
+ timestamp++;
+ }
+
+ if (tablet1.rowSize != 0) {
+ try {
+ session.insertTablets(tabletMap, true);
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ tablet1.reset();
+ tablet2.reset();
+ tablet3.reset();
+ }
+
+ assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+ assertNull(session.deviceIdToEndpoint);
+ assertNull(session.endPointToSessionConnection);
+ try {
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ // with leader cache
+ // rest the session connection
+ session = new MockSession("127.0.0.1", 55560, true);
+ try {
+ session.open();
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+ assertEquals(0, session.deviceIdToEndpoint.size());
+ assertEquals(1, session.endPointToSessionConnection.size());
+
+ for (long row = 0; row < 100; row++) {
+ int row1 = tablet1.rowSize++;
+ int row2 = tablet2.rowSize++;
+ int row3 = tablet3.rowSize++;
+ tablet1.addTimestamp(row1, timestamp);
+ tablet2.addTimestamp(row2, timestamp);
+ tablet3.addTimestamp(row3, timestamp);
+ for (int i = 0; i < 3; i++) {
+ long value = new Random().nextLong();
+ tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
+ tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
+ tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
+ }
+ if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
+ try {
+ session.insertTablets(tabletMap, true);
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ tablet1.reset();
+ tablet2.reset();
+ tablet3.reset();
+ }
+ timestamp++;
+ }
+
+ // set the session connection as broken
+ ((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
+ // set connection as broken, due to we enable the cache leader, when we called
+ // ((MockSession) session).getLastConstructedSessionConnection(), the session's endpoint has
+ // been changed to EndPoint(ip:127.0.0.1, port:55562)
+ Assert.assertEquals(
+ "MockSessionConnection{ endPoint=EndPoint(ip:127.0.0.1, port:55562)}",
+ ((MockSession) session).getLastConstructedSessionConnection().toString());
+
+ for (long row = 0; row < 10; row++) {
+ int row1 = tablet1.rowSize++;
+ int row2 = tablet2.rowSize++;
+ int row3 = tablet3.rowSize++;
+ tablet1.addTimestamp(row1, timestamp);
+ tablet2.addTimestamp(row2, timestamp);
+ tablet3.addTimestamp(row3, timestamp);
+ for (int i = 0; i < 3; i++) {
+ long value = new Random().nextLong();
+ tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
+ tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
+ tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
+ }
+ if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
+ try {
+ session.insertTablets(tabletMap, true);
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ tablet1.reset();
+ tablet2.reset();
+ tablet3.reset();
+ }
+ timestamp++;
+ }
+ try {
+ session.insertTablets(tabletMap, true);
+ } catch (IoTDBConnectionException e) {
+ Assert.assertEquals(
+ "the session connection = EndPoint(ip:127.0.0.1, port:55562) is broken", e.getMessage());
+ }
+ tablet1.reset();
+ tablet2.reset();
+ tablet3.reset();
+
+ assertEquals(session.metaSessionConnection, session.defaultSessionConnection);
+ assertEquals(2, session.deviceIdToEndpoint.size());
+ for (Map.Entry<String, EndPoint> endPointEntry : session.deviceIdToEndpoint.entrySet()) {
+ assertEquals(getDeviceIdBelongedEndpoint(endPointEntry.getKey()), endPointEntry.getValue());
+ }
+ assertEquals(3, session.endPointToSessionConnection.size());
+ try {
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
private void addLine(
List<Long> times,
List<List<String>> measurements,
@@ -738,6 +1073,8 @@ public class SessionCacheLeaderUT {
static class MockSession extends Session {
+ private MockSessionConnection lastConstructedSessionConnection;
+
public MockSession(String host, int rpcPort, boolean enableCacheLeader) {
super(
host,
@@ -754,69 +1091,132 @@ public class SessionCacheLeaderUT {
@Override
public SessionConnection constructSessionConnection(
Session session, EndPoint endpoint, ZoneId zoneId) {
- return new MockSessionConnection(session, endpoint, zoneId);
+ lastConstructedSessionConnection = new MockSessionConnection(session, endpoint, zoneId);
+ return lastConstructedSessionConnection;
+ }
+
+ public MockSessionConnection getLastConstructedSessionConnection() {
+ return lastConstructedSessionConnection;
}
}
static class MockSessionConnection extends SessionConnection {
+ private EndPoint endPoint;
+ private boolean connectionBroken;
+ private IoTDBConnectionException ioTDBConnectionException;
+
public MockSessionConnection(Session session, EndPoint endPoint, ZoneId zoneId) {
super();
+ this.endPoint = endPoint;
+ ioTDBConnectionException =
+ new IoTDBConnectionException(
+ String.format("the session connection = %s is broken", endPoint.toString()));
}
@Override
public void close() {}
@Override
- protected void setStorageGroup(String storageGroup) throws RedirectException {
+ protected void setStorageGroup(String storageGroup)
+ throws RedirectException, IoTDBConnectionException {
+ if (isConnectionBroken()) {
+ throw ioTDBConnectionException;
+ }
throw new RedirectException(endpoints.get(1));
}
@Override
- protected void deleteStorageGroups(List<String> storageGroups) throws RedirectException {
+ protected void deleteStorageGroups(List<String> storageGroups)
+ throws RedirectException, IoTDBConnectionException {
+ if (isConnectionBroken()) {
+ throw ioTDBConnectionException;
+ }
throw new RedirectException(endpoints.get(1));
}
@Override
- protected void insertRecord(TSInsertRecordReq request) throws RedirectException {
+ protected void insertRecord(TSInsertRecordReq request)
+ throws RedirectException, IoTDBConnectionException {
+ if (isConnectionBroken()) {
+ throw ioTDBConnectionException;
+ }
throw new RedirectException(getDeviceIdBelongedEndpoint(request.prefixPath));
}
@Override
- protected void insertRecord(TSInsertStringRecordReq request) throws RedirectException {
+ protected void insertRecord(TSInsertStringRecordReq request)
+ throws RedirectException, IoTDBConnectionException {
+ if (isConnectionBroken()) {
+ throw ioTDBConnectionException;
+ }
throw new RedirectException(getDeviceIdBelongedEndpoint(request.deviceId));
}
@Override
- protected void insertRecords(TSInsertRecordsReq request) throws RedirectException {
- throw new RedirectException(getDeviceIdBelongedEndpoint(request.deviceIds.get(0)));
+ protected void insertRecords(TSInsertRecordsReq request)
+ throws RedirectException, IoTDBConnectionException {
+ if (isConnectionBroken()) {
+ throw ioTDBConnectionException;
+ }
+ throw getRedirectException(request.getDeviceIds());
}
@Override
- protected void insertRecords(TSInsertStringRecordsReq request) throws RedirectException {
- throw new RedirectException(getDeviceIdBelongedEndpoint(request.deviceIds.get(0)));
+ protected void insertRecords(TSInsertStringRecordsReq request)
+ throws RedirectException, IoTDBConnectionException {
+ if (isConnectionBroken()) {
+ throw ioTDBConnectionException;
+ }
+ throw getRedirectException(request.getDeviceIds());
}
@Override
protected void insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq request)
- throws RedirectException {
+ throws RedirectException, IoTDBConnectionException {
+ if (isConnectionBroken()) {
+ throw ioTDBConnectionException;
+ }
throw new RedirectException(getDeviceIdBelongedEndpoint(request.deviceId));
}
@Override
- protected void insertTablet(TSInsertTabletReq request) throws RedirectException {
+ protected void insertTablet(TSInsertTabletReq request)
+ throws RedirectException, IoTDBConnectionException {
+ if (isConnectionBroken()) {
+ throw ioTDBConnectionException;
+ }
throw new RedirectException(getDeviceIdBelongedEndpoint(request.prefixPath));
}
@Override
- protected void insertTablets(TSInsertTabletsReq request) throws RedirectException {
+ protected void insertTablets(TSInsertTabletsReq request)
+ throws RedirectException, IoTDBConnectionException {
+ if (isConnectionBroken()) {
+ throw ioTDBConnectionException;
+ }
+ throw getRedirectException(request.getDeviceIds());
+ }
+
+ private RedirectException getRedirectException(List<String> deviceIds) {
Map<String, EndPoint> deviceEndPointMap = new HashMap<>();
- for (int i = 0; i < request.getDeviceIds().size(); i++) {
- deviceEndPointMap.put(
- request.getDeviceIds().get(i),
- getDeviceIdBelongedEndpoint(request.getDeviceIds().get(i)));
+ for (String deviceId : deviceIds) {
+ deviceEndPointMap.put(deviceId, getDeviceIdBelongedEndpoint(deviceId));
}
- throw new RedirectException(deviceEndPointMap);
+ return new RedirectException(deviceEndPointMap);
+ }
+
+ public boolean isConnectionBroken() {
+ return connectionBroken;
+ }
+
+ public void setConnectionBroken(boolean connectionBroken) {
+ this.connectionBroken = connectionBroken;
+ }
+
+ @Override
+ public String toString() {
+ return "MockSessionConnection{" + " endPoint=" + endPoint + "}";
}
}
}