You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/11/30 01:56:07 UTC
[iotdb] branch cluster_new updated: fix many bugs and imporve
performance
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_new
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cluster_new by this push:
new 57cfab3 fix many bugs and imporve performance
new 65d7209 Merge pull request #2130 from LebronAl/cluster_new_fix_many_bugs
57cfab3 is described below
commit 57cfab39cb5b4c42a85e550ee9b9e2a0ee9aec7f
Author: LebronAl <TX...@gmail.com>
AuthorDate: Thu Nov 26 20:12:57 2020 +0800
fix many bugs and imporve performance
---
.../iotdb/cluster/log/applier/BaseApplier.java | 2 +-
.../iotdb/cluster/log/applier/DataLogApplier.java | 9 +-
.../iotdb/cluster/log/snapshot/FileSnapshot.java | 2 +-
.../cluster/log/snapshot/PartitionedSnapshot.java | 2 +-
.../apache/iotdb/cluster/metadata/CMManager.java | 29 +++-
.../iotdb/cluster/query/ClusterPlanExecutor.java | 14 +-
.../iotdb/cluster/query/LocalQueryExecutor.java | 22 +--
.../cluster/query/aggregate/ClusterAggregator.java | 2 +-
.../cluster/query/fill/ClusterPreviousFill.java | 2 +-
.../query/last/ClusterLastQueryExecutor.java | 4 +-
.../cluster/query/reader/ClusterReaderFactory.java | 12 +-
.../iotdb/cluster/query/reader/DataSourceInfo.java | 57 +++++--
.../iotdb/cluster/server/DataClusterServer.java | 2 +-
.../org/apache/iotdb/cluster/server/Timer.java | 15 +-
.../handlers/caller/AppendNodeEntryHandler.java | 15 +-
.../cluster/server/member/MetaGroupMember.java | 45 ++++--
.../iotdb/cluster/server/member/RaftMember.java | 51 +++---
.../cluster/server/service/DataAsyncService.java | 8 +-
.../cluster/server/service/DataSyncService.java | 8 +-
.../apache/iotdb/cluster/utils/ClusterUtils.java | 2 +-
.../cluster/log/snapshot/DataSnapshotTest.java | 2 +-
.../iotdb/cluster/server/member/MemberTest.java | 56 +++++--
.../cluster/server/member/MetaGroupMemberTest.java | 3 +
.../org/apache/iotdb/db/service/TSServiceImpl.java | 54 ++++---
.../org/apache/iotdb/rpc/RedirectException.java | 13 ++
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 27 +++-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../java/org/apache/iotdb/session/Session.java | 178 ++++++++++++---------
.../apache/iotdb/session/SessionConnection.java | 5 +-
29 files changed, 414 insertions(+), 228 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index ac8db78..75cef69 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -87,7 +87,7 @@ abstract class BaseApplier implements LogApplier {
private void executeAfterSync(PhysicalPlan plan)
throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(true);
} catch (CheckConsistencyException ce) {
throw new QueryProcessException(ce.getMessage());
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 108007c..8ce84b5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -27,8 +27,10 @@ import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
@@ -76,7 +78,10 @@ public class DataLogApplier extends BaseApplier {
logger.error("Unsupported log: {}", log);
}
} catch (Exception e) {
- logger.debug("Exception occurred when applying {}", log, e);
+ Throwable rootCause = IOUtils.getRootCause(e);
+ if (!(rootCause instanceof PathNotExistException)) {
+ logger.debug("Exception occurred when applying {}", log, e);
+ }
log.setException(e);
} finally {
log.setApplied(true);
@@ -94,7 +99,7 @@ public class DataLogApplier extends BaseApplier {
// the sg may not exist because the node does not catch up with the leader, retry after
// synchronization
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(true);
} catch (CheckConsistencyException ce) {
throw new QueryProcessException(ce.getMessage());
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
index cd595ed..c5f75e9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
@@ -217,7 +217,7 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
throws SnapshotInstallationException {
// ensure StorageGroups are synchronized
try {
- dataGroupMember.getMetaGroupMember().syncLeaderWithConsistencyCheck();
+ dataGroupMember.getMetaGroupMember().syncLeaderWithConsistencyCheck(true);
} catch (CheckConsistencyException e) {
throw new SnapshotInstallationException(e);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
index d88bec5..a0eff88 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
@@ -193,7 +193,7 @@ public class PartitionedSnapshot<T extends Snapshot> extends Snapshot {
}
// ensure storage groups are synchronized
try {
- dataGroupMember.getMetaGroupMember().syncLeaderWithConsistencyCheck();
+ dataGroupMember.getMetaGroupMember().syncLeaderWithConsistencyCheck(true);
} catch (CheckConsistencyException e) {
throw new SnapshotInstallationException(e);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 5aebe7e..ac1a9bb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -70,6 +70,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.MetaUtils;
import org.apache.iotdb.db.metadata.PartialPath;
@@ -547,9 +548,10 @@ public class CMManager extends MManager {
return false;
}
if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() &&
- result.getCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
- logger.error("{} failed to execute create timeseries {}", metaGroupMember.getThisNode(),
- seriesPath);
+ result.getCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode() &&
+ result.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+ logger.error("{} failed to execute create timeseries {}: {}", metaGroupMember.getThisNode(),
+ seriesPath, result);
return false;
}
}
@@ -611,7 +613,8 @@ public class CMManager extends MManager {
public void pullTimeSeriesSchemas(List<PartialPath> prefixPaths,
Node ignoredGroup)
throws MetadataException {
- logger.debug("{}: Pulling timeseries schemas of {}", metaGroupMember.getName(), prefixPaths);
+ logger.debug("{}: Pulling timeseries schemas of {}, ignored group {}",
+ metaGroupMember.getName(), prefixPaths, ignoredGroup);
// split the paths by the data groups that should hold them
Map<PartitionGroup, List<String>> partitionGroupPathMap = new HashMap<>();
for (PartialPath prefixPath : prefixPaths) {
@@ -843,7 +846,7 @@ public class CMManager extends MManager {
public Set<PartialPath> getMatchedDevices(PartialPath originPath) throws MetadataException {
// make sure this node knows all storage groups
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new MetadataException(e);
}
@@ -1078,7 +1081,7 @@ public class CMManager extends MManager {
int limit, int offset) throws MetadataException {
// make sure this node knows all storage groups
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new MetadataException(e);
}
@@ -1116,7 +1119,7 @@ public class CMManager extends MManager {
public List<PartialPath> getMatchedPaths(PartialPath originPath) throws MetadataException {
// make sure this node knows all storage groups
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new MetadataException(e);
}
@@ -1322,7 +1325,7 @@ public class CMManager extends MManager {
throws CheckConsistencyException, MetadataException {
Node header = group.getHeader();
DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
- localDataMember.syncLeaderWithConsistencyCheck();
+ localDataMember.syncLeaderWithConsistencyCheck(false);
try {
List<ShowTimeSeriesResult> localResult = super.showTimeseries(plan, context);
resultSet.addAll(localResult);
@@ -1412,4 +1415,14 @@ public class CMManager extends MManager {
getAllPathsResult.setAliasList(alias);
return getAllPathsResult;
}
+
+ @Override
+ public PartialPath getStorageGroupPath(PartialPath path) throws StorageGroupNotSetException {
+ try {
+ return super.getStorageGroupPath(path);
+ } catch (StorageGroupNotSetException e) {
+ metaGroupMember.syncLeader();
+ return super.getStorageGroupPath(path);
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index c7f7b36..8b7116b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -57,13 +57,11 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
@@ -101,14 +99,14 @@ public class ClusterPlanExecutor extends PlanExecutor {
return processDataQuery((QueryPlan) queryPlan, context);
} else if (queryPlan instanceof ShowPlan) {
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new QueryProcessException(e.getMessage());
}
return processShowQuery((ShowPlan) queryPlan, context);
} else if (queryPlan instanceof AuthorPlan) {
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new QueryProcessException(e.getMessage());
}
@@ -132,7 +130,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
protected int getNodesNumInGivenLevel(PartialPath path, int level) throws MetadataException {
// make sure this node knows all storage groups
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new MetadataException(e);
}
@@ -180,7 +178,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
// this node is a member of the group, perform a local query after synchronizing with the
// leader
metaGroupMember.getLocalDataMember(partitionGroup.getHeader())
- .syncLeaderWithConsistencyCheck();
+ .syncLeaderWithConsistencyCheck(false);
int localResult = getLocalPathCount(pathUnderSG, level);
logger.debug("{}: get path count of {} locally, result {}", metaGroupMember.getName(),
partitionGroup, localResult);
@@ -341,7 +339,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
int level) throws CheckConsistencyException, MetadataException {
Node header = group.getHeader();
DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
- localDataMember.syncLeaderWithConsistencyCheck();
+ localDataMember.syncLeaderWithConsistencyCheck(false);
try {
return IoTDB.metaManager.getNodesList(schemaPattern, level,
new SlotSgFilter(
@@ -449,7 +447,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
throws CheckConsistencyException {
Node header = group.getHeader();
DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
- localDataMember.syncLeaderWithConsistencyCheck();
+ localDataMember.syncLeaderWithConsistencyCheck(false);
try {
return IoTDB.metaManager.getChildNodePathInNextLevel(path);
} catch (MetadataException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 77c701e..3f3880e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -160,7 +160,7 @@ public class LocalQueryExecutor {
throws CheckConsistencyException, QueryProcessException, StorageEngineException, IOException {
logger.debug("{}: {} is querying {}, queryId: {}", name, request.getRequester(),
request.getPath(), request.getQueryId());
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
PartialPath path = null;
try {
@@ -218,7 +218,7 @@ public class LocalQueryExecutor {
throws CheckConsistencyException, MetadataException {
// try to synchronize with the leader first in case that some schema logs are accepted but
// not committed yet
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
// collect local timeseries schemas and send to the requester
// the measurements in them are the full paths.
@@ -259,7 +259,7 @@ public class LocalQueryExecutor {
throws CheckConsistencyException, IllegalPathException {
// try to synchronize with the leader first in case that some schema logs are accepted but
// not committed yet
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
// collect local timeseries schemas and send to the requester
// the measurements in them are the full paths.
@@ -301,7 +301,7 @@ public class LocalQueryExecutor {
logger
.debug("{}: {} is querying {} by timestamp, queryId: {}", name, request.getRequester(),
request.getPath(), request.getQueryId());
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
PartialPath path = null;
try {
@@ -334,7 +334,7 @@ public class LocalQueryExecutor {
public ByteBuffer getAllMeasurementSchema(ByteBuffer planBuffer)
throws CheckConsistencyException, IOException, MetadataException {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
ShowTimeSeriesPlan plan = (ShowTimeSeriesPlan) PhysicalPlan.Factory.create(planBuffer);
List<ShowTimeSeriesResult> allTimeseriesSchema;
@@ -413,7 +413,7 @@ public class LocalQueryExecutor {
Filter timeFilter, QueryContext context, boolean ascending)
throws IOException, StorageEngineException, QueryProcessException {
try {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new QueryProcessException(e.getMessage());
}
@@ -447,7 +447,7 @@ public class LocalQueryExecutor {
*/
public List<String> getUnregisteredTimeseries(List<String> timeseriesList)
throws CheckConsistencyException {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
List<String> result = new ArrayList<>();
for (String seriesPath : timeseriesList) {
@@ -483,7 +483,7 @@ public class LocalQueryExecutor {
throws StorageEngineException, QueryProcessException {
// pull the newest data
try {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new StorageEngineException(e);
}
@@ -628,7 +628,7 @@ public class LocalQueryExecutor {
long beforeRange, Set<String> deviceMeasurements, QueryContext context)
throws QueryProcessException, StorageEngineException, IOException {
try {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new QueryProcessException(e.getMessage());
}
@@ -640,7 +640,7 @@ public class LocalQueryExecutor {
public int getPathCount(List<String> pathsToQuery, int level)
throws CheckConsistencyException, MetadataException {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
int count = 0;
for (String s : pathsToQuery) {
@@ -656,7 +656,7 @@ public class LocalQueryExecutor {
@SuppressWarnings("java:S1135") // ignore todos
public ByteBuffer last(LastQueryRequest request)
throws CheckConsistencyException, QueryProcessException, IOException, StorageEngineException, IllegalPathException {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
RemoteQueryContext queryContext = queryManager
.getQueryContext(request.getRequestor(), request.getQueryId(), DEFAULT_FETCH_SIZE, -1);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
index a711021..99067bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
@@ -78,7 +78,7 @@ public class ClusterAggregator {
QueryContext context, boolean ascending) throws StorageEngineException {
// make sure the partition table is new
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new StorageEngineException(e);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
index c78b6d9..3887b34 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
@@ -91,7 +91,7 @@ public class ClusterPreviousFill extends PreviousFill {
throws StorageEngineException {
// make sure the partition table is new
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new StorageEngineException(e);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index 0787df5..1dac967 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -87,7 +87,7 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
throws IOException, QueryProcessException {
// calculate the global last from all data groups
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new IOException(e);
}
@@ -174,7 +174,7 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
throws StorageEngineException, QueryProcessException, IOException {
DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader());
try {
- localDataMember.syncLeaderWithConsistencyCheck();
+ localDataMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new QueryProcessException(e.getMessage());
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 9c05214..3baab29 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -93,7 +93,7 @@ public class ClusterReaderFactory {
throws StorageEngineException, QueryProcessException {
// make sure the partition table is new
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new QueryProcessException(e.getMessage());
}
@@ -186,7 +186,7 @@ public class ClusterReaderFactory {
throws StorageEngineException, EmptyIntervalException {
// make sure the partition table is new
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new StorageEngineException(e);
}
@@ -262,7 +262,7 @@ public class ClusterReaderFactory {
throws StorageEngineException, QueryProcessException {
// pull the newest data
try {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new StorageEngineException(e);
}
@@ -368,7 +368,7 @@ public class ClusterReaderFactory {
throws StorageEngineException, QueryProcessException {
// make sure the partition table is new
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new QueryProcessException(e.getMessage());
}
@@ -524,7 +524,7 @@ public class ClusterReaderFactory {
throws StorageEngineException, QueryProcessException, IOException {
// pull the newest data
try {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new StorageEngineException(e);
}
@@ -551,7 +551,7 @@ public class ClusterReaderFactory {
TSDataType dataType, QueryContext context, DataGroupMember dataGroupMember, boolean ascending)
throws StorageEngineException, QueryProcessException {
try {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new StorageEngineException(e);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index d279970..7f408de 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -24,13 +24,19 @@ import java.util.List;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.db.utils.SerializeUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,11 +87,7 @@ public class DataSourceInfo {
Node node = nodes.get(nextNodePos);
logger.debug("querying {} from {} of {}", request.path, node, partitionGroup.getHeader());
try {
-
- AsyncDataClient client = this.metaGroupMember
- .getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
- Long newReaderId = applyForReaderId(client, byTimestamp, timestamp);
-
+ Long newReaderId = getReaderId(node, byTimestamp, timestamp);
if (newReaderId != null) {
logger.debug("get a readerId {} for {} from {}", newReaderId, request.path, node);
if (newReaderId != -1) {
@@ -121,8 +123,18 @@ public class DataSourceInfo {
return false;
}
- private Long applyForReaderId(AsyncDataClient client, boolean byTimestamp, long timestamp)
- throws TException, InterruptedException {
+ private Long getReaderId(Node node, boolean byTimestamp, long timestamp)
+ throws TException, InterruptedException, IOException {
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ return applyForReaderIdAsync(node, byTimestamp, timestamp);
+ }
+ return applyForReaderIdSync(node, byTimestamp, timestamp);
+ }
+
+ private Long applyForReaderIdAsync(Node node, boolean byTimestamp, long timestamp)
+ throws TException, InterruptedException, IOException {
+ AsyncDataClient client = this.metaGroupMember
+ .getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
Long newReaderId;
if (byTimestamp) {
newReaderId = SyncClientAdaptor.querySingleSeriesByTimestamp(client, request);
@@ -132,6 +144,32 @@ public class DataSourceInfo {
return newReaderId;
}
+ private Long applyForReaderIdSync(Node node, boolean byTimestamp, long timestamp)
+ throws TException {
+ SyncDataClient client = this.metaGroupMember
+ .getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+ Long newReaderId;
+ try {
+ if (byTimestamp) {
+ newReaderId = client.querySingleSeriesByTimestamp(request);
+ } else {
+ Filter newFilter;
+ // add timestamp to as a timeFilter to skip the data which has been read
+ if (request.isSetTimeFilterBytes()) {
+ Filter timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
+ newFilter = new AndFilter(timeFilter, TimeFilter.gt(timestamp));
+ } else {
+ newFilter = TimeFilter.gt(timestamp);
+ }
+ request.setTimeFilterBytes(SerializeUtils.serializeFilter(newFilter));
+ newReaderId = client.querySingleSeries(request);
+ }
+ return newReaderId;
+ } finally {
+ client.putBack();
+ }
+ }
+
public long getReaderId() {
return this.readerId;
}
@@ -149,7 +187,8 @@ public class DataSourceInfo {
}
AsyncDataClient getCurAsyncClient(int timeout) throws IOException {
- return isNoClient ? null : metaGroupMember.getClientProvider().getAsyncDataClient(this.curSource, timeout);
+ return isNoClient ? null
+ : metaGroupMember.getClientProvider().getAsyncDataClient(this.curSource, timeout);
}
SyncDataClient getCurSyncClient(int timeout) {
@@ -161,7 +200,7 @@ public class DataSourceInfo {
return this.isNoData;
}
- private boolean isNoClient(){
+ private boolean isNoClient() {
return this.isNoClient;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 1e8a277..391cf69 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -202,7 +202,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
partitionGroup = partitionTable.getHeaderGroup(header);
if (partitionGroup == null || !partitionGroup.contains(thisNode)) {
// if the partition table is old, this node may have not been moved to the new group
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(true);
partitionGroup = partitionTable.getHeaderGroup(header);
}
if (partitionGroup != null && partitionGroup.contains(thisNode)) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
index 255041c..04dfe50 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
@@ -23,6 +23,7 @@ package org.apache.iotdb.cluster.server;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.server.member.RaftMember;
public class Timer {
@@ -86,6 +87,13 @@ public class Timer {
RAFT_SENDER_WAIT_FOR_PREV_LOG(
RAFT_MEMBER_SENDER, "sender wait for prev log", TIME_SCALE, true,
RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+ RAFT_SENDER_SERIALIZE_LOG(
+ RAFT_MEMBER_SENDER, "serialize logs", TIME_SCALE, true,
+ RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+ RAFT_SENDER_SEND_LOG_ASYNC(
+ RAFT_MEMBER_SENDER, "send log async", TIME_SCALE,
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
RAFT_SENDER_SEND_LOG(
RAFT_MEMBER_SENDER, "send log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
RAFT_SENDER_VOTE_COUNTER(
@@ -136,11 +144,12 @@ public class Timer {
RaftMember.USE_LOG_DISPATCHER, DATA_GROUP_MEMBER_LOCAL_EXECUTION),
// raft member - receiver
RAFT_RECEIVER_LOG_PARSE(
- RAFT_MEMBER_RECEIVER, "log parse", TIME_SCALE, true, RAFT_SENDER_SEND_LOG),
+ RAFT_MEMBER_RECEIVER, "log parse", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
RAFT_RECEIVER_WAIT_FOR_PREV_LOG(
- RAFT_MEMBER_RECEIVER, "receiver wait for prev log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG),
+ RAFT_MEMBER_RECEIVER, "receiver wait for prev log", TIME_SCALE, true,
+ RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
RAFT_RECEIVER_APPEND_ENTRY(
- RAFT_MEMBER_RECEIVER, "append entrys", TIME_SCALE, true, RAFT_SENDER_SEND_LOG),
+ RAFT_MEMBER_RECEIVER, "append entrys", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
RAFT_RECEIVER_INDEX_DIFF(
RAFT_MEMBER_RECEIVER, "index diff", 1.0, true, ROOT),
// log dispatcher
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index b2ea3f4..8720c65 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -61,13 +61,17 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
public AppendNodeEntryHandler() {
- if (Timer.ENABLE_INSTRUMENTING && ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ if (Timer.ENABLE_INSTRUMENTING && ClusterDescriptor.getInstance().getConfig()
+ .isUseAsyncServer()) {
sendStart = System.nanoTime();
}
}
@Override
public void onComplete(Long response) {
+ if (Timer.ENABLE_INSTRUMENTING) {
+ Statistic.RAFT_SENDER_SEND_LOG_ASYNC.calOperationCostTimeFromStart(sendStart);
+ }
logger.debug("{}: Append response {} from {}", member.getName(), response, receiver);
if (leaderShipStale.get()) {
// someone has rejected this log because the leadership is stale
@@ -85,9 +89,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
voteCounter.notifyAll();
}
peer.setMatchIndex(Math.max(log.getCurrLogIndex(), peer.getMatchIndex()));
- if (Timer.ENABLE_INSTRUMENTING) {
- Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(sendStart);
- }
} else if (resp > 0) {
// a response > 0 is the follower's term
// the leader ship is stale, wait for the new leader's heartbeat
@@ -100,17 +101,11 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
}
leaderShipStale.set(true);
voteCounter.notifyAll();
- if (Timer.ENABLE_INSTRUMENTING) {
- Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(sendStart);
- }
} else {
//e.g., Response.RESPONSE_LOG_MISMATCH
logger.debug("{}: The log {} is rejected by {} because: {}", member.getName(), log,
receiver, resp);
onFail();
- if (Timer.ENABLE_INSTRUMENTING) {
- Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(sendStart);
- }
}
// rejected because the receiver's logs are stale or the receiver has no cluster info, just
// wait for the heartbeat to handle
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index ca8de8d..b02f7a8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1412,7 +1412,7 @@ public class MetaGroupMember extends RaftMember {
}
}
try {
- syncLeaderWithConsistencyCheck();
+ syncLeaderWithConsistencyCheck(true);
List<PartitionGroup> globalGroups = partitionTable.getGlobalGroups();
logger.debug("Forwarding global data plan {} to {} groups", plan, globalGroups.size());
return forwardPlan(globalGroups, plan);
@@ -1475,7 +1475,7 @@ public class MetaGroupMember extends RaftMember {
planGroupMap = router.splitAndRoutePlan(plan);
} catch (StorageGroupNotSetException e) {
// synchronize with the leader to see if this node has unpulled storage groups
- syncLeaderWithConsistencyCheck();
+ syncLeaderWithConsistencyCheck(true);
try {
planGroupMap = router.splitAndRoutePlan(plan);
} catch (MetadataException ex) {
@@ -1514,26 +1514,39 @@ public class MetaGroupMember extends RaftMember {
if (plan instanceof InsertPlan
&& status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
&& ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
- // try to create timeseries
- if (((InsertPlan) plan).getFailedMeasurements() != null) {
- ((InsertPlan) plan).getPlanFromFailed();
- }
- boolean hasCreate;
- try {
- hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries((InsertPlan) plan);
- } catch (IllegalPathException e) {
- return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
- }
- if (hasCreate) {
- status = forwardPlan(planGroupMap, plan);
- } else {
- logger.error("{}, Cannot auto create timeseries.", thisNode);
+ TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, ((InsertPlan) plan));
+ if (tmpStatus != null) {
+ status = tmpStatus;
}
}
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() && status
+ .isSetRedirectNode()) {
+ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ }
logger.debug("{}: executed {} with answer {}", name, plan, status);
return status;
}
+ private TSStatus createTimeseriesForFailedInsertion(
+ Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+ // try to create timeseries
+ if (plan.getFailedMeasurements() != null) {
+ plan.getPlanFromFailed();
+ }
+ boolean hasCreate;
+ try {
+ hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+ } catch (IllegalPathException e) {
+ return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
+ }
+ if (hasCreate) {
+ return forwardPlan(planGroupMap, plan);
+ } else {
+ logger.error("{}, Cannot auto create timeseries.", thisNode);
+ }
+ return null;
+ }
+
/**
* Forward each sub-plan to its belonging data group, and combine responses from the groups.
*
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 67e0651..a64292c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -572,7 +572,7 @@ public abstract class RaftMember {
return getSyncClient(syncHeartbeatClientPool, node);
}
- private void sendLogAsync(Log log, AtomicInteger voteCounter, Node node,
+ public void sendLogAsync(Log log, AtomicInteger voteCounter, Node node,
AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, AppendEntryRequest request,
Peer peer) {
AsyncClient client = getSendLogAsyncClient(node);
@@ -740,27 +740,34 @@ public abstract class RaftMember {
/**
* according to the consistency configuration, decide whether to execute syncLeader or not and
- * throws exception when failed
+ * throws exception when failed. Note that the write request will always try to sync leader
*/
- public void syncLeaderWithConsistencyCheck() throws CheckConsistencyException {
- switch (ClusterDescriptor.getInstance().getConfig().getConsistencyLevel()) {
- case STRONG_CONSISTENCY:
- if (!syncLeader()) {
- throw CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION;
- }
- return;
- case MID_CONSISTENCY:
- // do not care success or not
- syncLeader();
- return;
- case WEAK_CONSISTENCY:
- // do nothing
- return;
- default:
- // this should not happen in theory
- throw new CheckConsistencyException(
- "unknown consistency=" + ClusterDescriptor.getInstance().getConfig()
- .getConsistencyLevel().name());
+ public void syncLeaderWithConsistencyCheck(boolean isWriteRequest)
+ throws CheckConsistencyException {
+ if (isWriteRequest) {
+ if (!syncLeader()) {
+ throw CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION;
+ }
+ } else {
+ switch (ClusterDescriptor.getInstance().getConfig().getConsistencyLevel()) {
+ case STRONG_CONSISTENCY:
+ if (!syncLeader()) {
+ throw CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION;
+ }
+ return;
+ case MID_CONSISTENCY:
+ // do not care success or not
+ syncLeader();
+ return;
+ case WEAK_CONSISTENCY:
+ // do nothing
+ return;
+ default:
+ // this should not happen in theory
+ throw new CheckConsistencyException(
+ "unknown consistency=" + ClusterDescriptor.getInstance().getConfig()
+ .getConsistencyLevel().name());
+ }
}
}
@@ -1616,9 +1623,7 @@ public abstract class RaftMember {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
sendLogAsync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer);
} else {
- startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer);
- Timer.Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 0e8f3b5..3e3accf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -211,7 +211,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
public void getAllPaths(Node header, List<String> paths, boolean withAlias,
AsyncMethodCallback<GetAllPathsResult> resultHandler) {
try {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
resultHandler.onComplete(((CMManager) IoTDB.metaManager).getAllPaths(paths, withAlias));
} catch (MetadataException | CheckConsistencyException e) {
resultHandler.onError(e);
@@ -222,7 +222,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
public void getAllDevices(Node header, List<String> path,
AsyncMethodCallback<Set<String>> resultHandler) {
try {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
resultHandler.onComplete(((CMManager) IoTDB.metaManager).getAllDevices(path));
} catch (MetadataException | CheckConsistencyException e) {
resultHandler.onError(e);
@@ -233,7 +233,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
public void getNodeList(Node header, String path, int nodeLevel,
AsyncMethodCallback<List<String>> resultHandler) {
try {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
resultHandler.onComplete(((CMManager) IoTDB.metaManager).getNodeList(path, nodeLevel));
} catch (CheckConsistencyException | MetadataException e) {
resultHandler.onError(e);
@@ -244,7 +244,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
public void getChildNodePathInNextLevel(Node header, String path,
AsyncMethodCallback<Set<String>> resultHandler) {
try {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
resultHandler.onComplete(((CMManager) IoTDB.metaManager).getChildNodePathInNextLevel(path));
} catch (CheckConsistencyException | MetadataException e) {
resultHandler.onError(e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 53a6a12..7ce1c66 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -215,7 +215,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
public GetAllPathsResult getAllPaths(Node header, List<String> paths, boolean withAlias)
throws TException {
try {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
return ((CMManager) IoTDB.metaManager).getAllPaths(paths, withAlias);
} catch (MetadataException | CheckConsistencyException e) {
throw new TException(e);
@@ -225,7 +225,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
@Override
public Set<String> getAllDevices(Node header, List<String> path) throws TException {
try {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
return ((CMManager) IoTDB.metaManager).getAllDevices(path);
} catch (MetadataException | CheckConsistencyException e) {
throw new TException(e);
@@ -235,7 +235,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
@Override
public List<String> getNodeList(Node header, String path, int nodeLevel) throws TException {
try {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
return ((CMManager) IoTDB.metaManager).getNodeList(path, nodeLevel);
} catch (CheckConsistencyException | MetadataException e) {
throw new TException(e);
@@ -245,7 +245,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
@Override
public Set<String> getChildNodePathInNextLevel(Node header, String path) throws TException {
try {
- dataGroupMember.syncLeaderWithConsistencyCheck();
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
return ((CMManager) IoTDB.metaManager).getChildNodePathInNextLevel(path);
} catch (CheckConsistencyException | MetadataException e) {
throw new TException(e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index d334e06..8346ddc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -319,7 +319,7 @@ public class ClusterUtils {
// the storage group is not found locally, but may be found in the leader, retry after
// synchronizing with the leader
try {
- metaGroupMember.syncLeaderWithConsistencyCheck();
+ metaGroupMember.syncLeaderWithConsistencyCheck(true);
} catch (CheckConsistencyException checkConsistencyException) {
throw new MetadataException(checkConsistencyException.getMessage());
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
index 957f441..b89d058 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
@@ -138,7 +138,7 @@ public abstract class DataSnapshotTest {
// do nothing
metaGroupMember = new TestMetaGroupMember() {
@Override
- public void syncLeaderWithConsistencyCheck() {
+ public void syncLeaderWithConsistencyCheck(boolean isWriteRequest) {
// do nothing
}
};
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
index 839ac72..4f8a07b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
@@ -307,81 +307,109 @@ public class MemberTest {
}
@Test
- public void testSyncLeaderWithConsistencyCheck() {
- // 1. Strong consistency level with syncLeader false
+ public void testsyncLeaderWithConsistencyCheck() {
+
+ // 1. write request : Strong consistency level with syncLeader false
+ DataGroupMember dataGroupMemberWithWriteStrongConsistencyFalse = newDataGroupMemberWithSyncLeader(
+ TestUtils.getNode(0), false);
+ ClusterDescriptor.getInstance().getConfig()
+ .setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
+ CheckConsistencyException exception = null;
+ try {
+ dataGroupMemberWithWriteStrongConsistencyFalse.syncLeaderWithConsistencyCheck(true);
+ } catch (CheckConsistencyException e) {
+ exception = e;
+ }
+ Assert.assertNotNull(exception);
+ Assert.assertEquals(CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION, exception);
+
+ // 2. write request : Strong consistency level with syncLeader true
+ DataGroupMember dataGroupMemberWithWriteStrongConsistencyTrue = newDataGroupMemberWithSyncLeader(
+ TestUtils.getNode(0), true);
+ ClusterDescriptor.getInstance().getConfig()
+ .setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
+ exception = null;
+ try {
+ dataGroupMemberWithWriteStrongConsistencyTrue.syncLeaderWithConsistencyCheck(true);
+ } catch (CheckConsistencyException e) {
+ exception = e;
+ }
+ Assert.assertNull(exception);
+
+ // 3. Strong consistency level with syncLeader false
DataGroupMember dataGroupMemberWithStrongConsistencyFalse = newDataGroupMemberWithSyncLeader(
TestUtils.getNode(0), false);
ClusterDescriptor.getInstance().getConfig()
.setConsistencyLevel(ConsistencyLevel.STRONG_CONSISTENCY);
- CheckConsistencyException exception = null;
+ exception = null;
try {
- dataGroupMemberWithStrongConsistencyFalse.syncLeaderWithConsistencyCheck();
+ dataGroupMemberWithStrongConsistencyFalse.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
exception = e;
}
Assert.assertNotNull(exception);
Assert.assertEquals(CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION, exception);
- // 2. Strong consistency level with syncLeader true
+ // 4. Strong consistency level with syncLeader true
DataGroupMember dataGroupMemberWithStrongConsistencyTrue = newDataGroupMemberWithSyncLeader(
TestUtils.getNode(0), true);
ClusterDescriptor.getInstance().getConfig()
.setConsistencyLevel(ConsistencyLevel.STRONG_CONSISTENCY);
exception = null;
try {
- dataGroupMemberWithStrongConsistencyTrue.syncLeaderWithConsistencyCheck();
+ dataGroupMemberWithStrongConsistencyTrue.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
exception = e;
}
Assert.assertNull(exception);
- // 3. Mid consistency level with syncLeader false
+ // 5. Mid consistency level with syncLeader false
DataGroupMember dataGroupMemberWithMidConsistencyFalse = newDataGroupMemberWithSyncLeader(
TestUtils.getNode(0), false);
ClusterDescriptor.getInstance().getConfig()
.setConsistencyLevel(ConsistencyLevel.MID_CONSISTENCY);
exception = null;
try {
- dataGroupMemberWithMidConsistencyFalse.syncLeaderWithConsistencyCheck();
+ dataGroupMemberWithMidConsistencyFalse.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
exception = e;
}
Assert.assertNull(exception);
- // 4. Mid consistency level with syncLeader true
+ // 6. Mid consistency level with syncLeader true
DataGroupMember dataGroupMemberWithMidConsistencyTrue = newDataGroupMemberWithSyncLeader(
TestUtils.getNode(0), true);
ClusterDescriptor.getInstance().getConfig()
.setConsistencyLevel(ConsistencyLevel.MID_CONSISTENCY);
exception = null;
try {
- dataGroupMemberWithMidConsistencyTrue.syncLeaderWithConsistencyCheck();
+ dataGroupMemberWithMidConsistencyTrue.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
exception = e;
}
Assert.assertNull(exception);
- // 5. Weak consistency level with syncLeader false
+ // 7. Weak consistency level with syncLeader false
DataGroupMember dataGroupMemberWithWeakConsistencyFalse = newDataGroupMemberWithSyncLeader(
TestUtils.getNode(0), false);
ClusterDescriptor.getInstance().getConfig()
.setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
exception = null;
try {
- dataGroupMemberWithWeakConsistencyFalse.syncLeaderWithConsistencyCheck();
+ dataGroupMemberWithWeakConsistencyFalse.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
exception = e;
}
Assert.assertNull(exception);
- // 6. Weak consistency level with syncLeader true
+ // 8. Weak consistency level with syncLeader true
DataGroupMember dataGroupMemberWithWeakConsistencyTrue = newDataGroupMemberWithSyncLeader(
TestUtils.getNode(0), true);
ClusterDescriptor.getInstance().getConfig()
.setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
exception = null;
try {
- dataGroupMemberWithWeakConsistencyTrue.syncLeaderWithConsistencyCheck();
+ dataGroupMemberWithWeakConsistencyTrue.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
exception = e;
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 07e8071..32b0b30 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -734,6 +734,9 @@ public class MetaGroupMemberTest extends MemberTest {
schema.getEncodingType(), schema.getCompressor(), schema.getProps(),
Collections.emptyMap(), Collections.emptyMap(), null);
status = testMetaMember.executeNonQueryPlan(createTimeSeriesPlan);
+ if (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+ status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.code);
assertTrue(IoTDB.metaManager.isPathExist(new PartialPath(TestUtils.getTestSeries(i, 0))));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index e99565d..b2ff57c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.QueryInBatchStatementException;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -1507,30 +1508,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
- List<TSStatus> statusList = new ArrayList<>();
+ List<TSStatus> statusList = insertTabletsInternal(req);
boolean isAllSuccessful = true;
- for (int i = 0; i < req.deviceIds.size(); i++) {
- InsertTabletPlan insertTabletPlan = new InsertTabletPlan(
- new PartialPath(req.deviceIds.get(i)),
- req.measurementsList.get(i));
- insertTabletPlan.setTimes(
- QueryDataSetUtils.readTimesFromBuffer(req.timestampsList.get(i), req.sizeList.get(i)));
- insertTabletPlan.setColumns(
- QueryDataSetUtils.readValuesFromBuffer(
- req.valuesList.get(i), req.typesList.get(i), req.measurementsList.get(i).size(),
- req.sizeList.get(i)));
- insertTabletPlan.setRowCount(req.sizeList.get(i));
- insertTabletPlan.setDataTypes(req.typesList.get(i));
-
- TSStatus status = checkAuthority(insertTabletPlan, req.getSessionId());
- if (status == null) {
- status = executeNonQueryPlan(insertTabletPlan);
- isAllSuccessful =
- ((status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())
- && isAllSuccessful);
- }
- statusList.add(status);
+ for (TSStatus subStatus : statusList) {
+ isAllSuccessful =
+ ((subStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())
+ && isAllSuccessful);
}
+
if (!isAllSuccessful) {
return RpcUtils.getStatus(statusList);
}
@@ -1544,6 +1529,31 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
+ public List<TSStatus> insertTabletsInternal(TSInsertTabletsReq req) throws IllegalPathException {
+ List<TSStatus> statusList = new ArrayList<>();
+
+ for (int i = 0; i < req.deviceIds.size(); i++) {
+ InsertTabletPlan insertTabletPlan = new InsertTabletPlan(
+ new PartialPath(req.deviceIds.get(i)),
+ req.measurementsList.get(i));
+ insertTabletPlan.setTimes(
+ QueryDataSetUtils.readTimesFromBuffer(req.timestampsList.get(i), req.sizeList.get(i)));
+ insertTabletPlan.setColumns(
+ QueryDataSetUtils.readValuesFromBuffer(
+ req.valuesList.get(i), req.typesList.get(i), req.measurementsList.get(i).size(),
+ req.sizeList.get(i)));
+ insertTabletPlan.setRowCount(req.sizeList.get(i));
+ insertTabletPlan.setDataTypes(req.typesList.get(i));
+
+ TSStatus status = checkAuthority(insertTabletPlan, req.getSessionId());
+ if (status == null) {
+ status = executeNonQueryPlan(insertTabletPlan);
+ }
+ statusList.add(status);
+ }
+ return statusList;
+ }
+
@Override
public TSStatus setStorageGroup(long sessionId, String storageGroup) {
try {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
index d8f39d0..c7989b6 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
@@ -19,19 +19,32 @@
package org.apache.iotdb.rpc;
+import java.util.Map;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
public class RedirectException extends Exception {
private final EndPoint endPoint;
+ private final Map<String,EndPoint> deviceEndPointMap;
+
public RedirectException(EndPoint endpoint) {
super("later request in same group will be redirected to " + endpoint.toString());
this.endPoint = endpoint;
+ this.deviceEndPointMap = null;
+ }
+
+ public RedirectException(Map<String,EndPoint> deviceEndPointMap) {
+ super("later request in same group will be redirected to " + deviceEndPointMap);
+ this.endPoint = null;
+ this.deviceEndPointMap = deviceEndPointMap;
}
public EndPoint getEndPoint() {
return this.endPoint;
}
+ public Map<String, EndPoint> getDeviceEndPointMap() {
+ return deviceEndPointMap;
+ }
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 8149dc5..7131009 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -19,10 +19,14 @@
package org.apache.iotdb.rpc;
import java.lang.reflect.Proxy;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
public class RpcUtils {
@@ -50,6 +54,9 @@ public class RpcUtils {
verifySuccess(status.getSubStatus());
return;
}
+ if (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+ return;
+ }
if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new StatementExecutionException(status);
}
@@ -63,10 +70,28 @@ public class RpcUtils {
}
}
+ public static void verifySuccessWithRedirectionForInsertTablets(TSStatus status,
+ TSInsertTabletsReq req)
+ throws StatementExecutionException, RedirectException {
+ verifySuccess(status);
+ if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ Map<String, EndPoint> deviceEndPointMap = new HashMap<>();
+ List<TSStatus> statusSubStatus = status.getSubStatus();
+ for (int i = 0; i < statusSubStatus.size(); i++) {
+ TSStatus subStatus = statusSubStatus.get(i);
+ if (subStatus.isSetRedirectNode()) {
+ deviceEndPointMap.put(req.getDeviceIds().get(i), subStatus.getRedirectNode());
+ }
+ }
+ throw new RedirectException(deviceEndPointMap);
+ }
+ }
+
public static void verifySuccess(List<TSStatus> statuses) throws BatchExecutionException {
StringBuilder errMsgs = new StringBuilder();
for (TSStatus status : statuses) {
- if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && status.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
errMsgs.append(status.getMessage()).append(";");
}
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index d70616f..29b0596 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -86,6 +86,7 @@ public enum TSStatusCode {
NODE_READ_ONLY(704),
CONSISTENCY_FAILURE(705),
NO_CONNECTION(706),
+ NEED_REDIRECTION(707)
;
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 7a4c0c9..333bdbc 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -44,7 +44,6 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -153,8 +152,7 @@ public class Session {
}
}
- public synchronized String getTimeZone()
- throws StatementExecutionException, IoTDBConnectionException {
+ public synchronized String getTimeZone() {
return defaultSessionConnection.getTimeZone();
}
@@ -310,13 +308,6 @@ public class Session {
return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
}
- private TSRawDataQueryReq genTSRawDataQueryReq(List<String> paths, long startTime, long endTime) {
- TSRawDataQueryReq request = new TSRawDataQueryReq();
- request.setPaths(paths);
- request.setStartTime(startTime);
- request.setEndTime(endTime);
- return request;
- }
/**
* insert data in one row, if you want to improve your performance, please use insertRecords
@@ -338,7 +329,7 @@ public class Session {
try {
getSessionConnection(deviceId).insertRecord(request);
} catch (RedirectException e) {
- handleRedirection(deviceId, e);
+ handleRedirection(deviceId, e.getEndPoint());
}
}
@@ -347,7 +338,7 @@ public class Session {
try {
getSessionConnection(deviceId).insertRecord(request);
} catch (RedirectException e) {
- handleRedirection(deviceId, e);
+ handleRedirection(deviceId, e.getEndPoint());
}
}
@@ -381,15 +372,14 @@ public class Session {
}
}
- private void handleRedirection(String deviceId, RedirectException e)
+ private void handleRedirection(String deviceId, EndPoint endpoint)
throws IoTDBConnectionException {
if (Config.DEFAULT_CACHE_LEADER_MODE) {
- logger.debug("DeviceId[{}]:{}", deviceId, e.getMessage());
- deviceIdToEndpoint.put(deviceId, e.getEndPoint());
+ deviceIdToEndpoint.put(deviceId, endpoint);
SessionConnection connection = endPointToSessionConnection
- .computeIfAbsent(e.getEndPoint(), k -> {
+ .computeIfAbsent(endpoint, k -> {
try {
- return new SessionConnection(this, e.getEndPoint(), zoneId);
+ return new SessionConnection(this, endpoint, zoneId);
} catch (IoTDBConnectionException ex) {
tmp.set(ex);
return null;
@@ -470,29 +460,8 @@ public class Session {
throw new IllegalArgumentException(
"deviceIds, times, measurementsList and valuesList's size should be equal");
}
- StringBuilder errMsgBuilder = new StringBuilder("");
if (Config.DEFAULT_CACHE_LEADER_MODE) {
- Map<String, TSInsertStringRecordsReq> deviceGroup = new HashMap<>();
- for (int i = 0; i < deviceIds.size(); i++) {
- TSInsertStringRecordsReq request = deviceGroup
- .computeIfAbsent(deviceIds.get(i), k -> new TSInsertStringRecordsReq());
- updateTSInsertStringRecordsReq(request, deviceIds.get(i), times.get(i),
- measurementsList.get(i), valuesList.get(i));
- }
- //TODO parallel
- for (Entry<String, TSInsertStringRecordsReq> entry : deviceGroup.entrySet()) {
- try {
- getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
- } catch (RedirectException e) {
- handleRedirection(entry.getKey(), e);
- } catch (StatementExecutionException e) {
- errMsgBuilder.append(e.getMessage());
- }
- }
- String errMsg = errMsgBuilder.toString();
- if (!errMsg.isEmpty()) {
- throw new StatementExecutionException(errMsg);
- }
+ insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList);
} else {
TSInsertStringRecordsReq request = genTSInsertStringRecordsReq(deviceIds, times,
measurementsList, valuesList);
@@ -504,6 +473,33 @@ public class Session {
}
}
+ private void insertStringRecordsWithLeaderCache(List<String> deviceIds, List<Long> times,
+ List<List<String>> measurementsList, List<List<String>> valuesList)
+ throws IoTDBConnectionException, StatementExecutionException {
+ Map<String, TSInsertStringRecordsReq> deviceGroup = new HashMap<>();
+ for (int i = 0; i < deviceIds.size(); i++) {
+ TSInsertStringRecordsReq request = deviceGroup
+ .computeIfAbsent(deviceIds.get(i), k -> new TSInsertStringRecordsReq());
+ updateTSInsertStringRecordsReq(request, deviceIds.get(i), times.get(i),
+ measurementsList.get(i), valuesList.get(i));
+ }
+ //TODO parallel
+ StringBuilder errMsgBuilder = new StringBuilder();
+ for (Entry<String, TSInsertStringRecordsReq> entry : deviceGroup.entrySet()) {
+ try {
+ getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
+ } catch (RedirectException e) {
+ handleRedirection(entry.getKey(), e.getEndPoint());
+ } catch (StatementExecutionException e) {
+ errMsgBuilder.append(e.getMessage());
+ }
+ }
+ String errMsg = errMsgBuilder.toString();
+ if (!errMsg.isEmpty()) {
+ throw new StatementExecutionException(errMsg);
+ }
+ }
+
private TSInsertStringRecordsReq genTSInsertStringRecordsReq(List<String> deviceId,
List<Long> time,
List<List<String>> measurements, List<List<String>> values) {
@@ -543,21 +539,7 @@ public class Session {
"deviceIds, times, measurementsList and valuesList's size should be equal");
}
if (Config.DEFAULT_CACHE_LEADER_MODE) {
- Map<String, TSInsertRecordsReq> deviceGroup = new HashMap<>();
- for (int i = 0; i < deviceIds.size(); i++) {
- TSInsertRecordsReq request = deviceGroup
- .computeIfAbsent(deviceIds.get(i), k -> new TSInsertRecordsReq());
- updateTSInsertRecordsReq(request, deviceIds.get(i), times.get(i),
- measurementsList.get(i), typesList.get(i), valuesList.get(i));
- }
- //TODO parallel
- for (Entry<String, TSInsertRecordsReq> entry : deviceGroup.entrySet()) {
- try {
- getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
- } catch (RedirectException e) {
- handleRedirection(entry.getKey(), e);
- }
- }
+ insertRecordsWithLeaderCache(deviceIds, times, measurementsList, typesList, valuesList);
} else {
TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times, measurementsList,
typesList, valuesList);
@@ -570,6 +552,34 @@ public class Session {
}
}
+ private void insertRecordsWithLeaderCache(List<String> deviceIds, List<Long> times,
+ List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList)
+ throws IoTDBConnectionException, StatementExecutionException {
+ Map<String, TSInsertRecordsReq> deviceGroup = new HashMap<>();
+ for (int i = 0; i < deviceIds.size(); i++) {
+ TSInsertRecordsReq request = deviceGroup
+ .computeIfAbsent(deviceIds.get(i), k -> new TSInsertRecordsReq());
+ updateTSInsertRecordsReq(request, deviceIds.get(i), times.get(i),
+ measurementsList.get(i), typesList.get(i), valuesList.get(i));
+ }
+ //TODO parallel
+ StringBuilder errMsgBuilder = new StringBuilder();
+ for (Entry<String, TSInsertRecordsReq> entry : deviceGroup.entrySet()) {
+ try {
+ getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
+ } catch (RedirectException e) {
+ handleRedirection(entry.getKey(), e.getEndPoint());
+ } catch (StatementExecutionException e) {
+ errMsgBuilder.append(e.getMessage());
+ }
+ }
+ String errMsg = errMsgBuilder.toString();
+ if (!errMsg.isEmpty()) {
+ throw new StatementExecutionException(errMsg);
+ }
+ }
+
private TSInsertRecordsReq genTSInsertRecordsReq(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<TSDataType>> typesList,
List<List<Object>> valuesList) throws IoTDBConnectionException {
@@ -619,7 +629,7 @@ public class Session {
defaultSessionConnection.insertTablet(request);
}
} catch (RedirectException e) {
- handleRedirection(tablet.deviceId, e);
+ handleRedirection(tablet.deviceId, e.getEndPoint());
}
}
@@ -641,7 +651,7 @@ public class Session {
defaultSessionConnection.insertTablet(request);
}
} catch (RedirectException e) {
- handleRedirection(tablet.deviceId, e);
+ handleRedirection(tablet.deviceId, e.getEndPoint());
}
}
@@ -689,25 +699,7 @@ public class Session {
public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
if (Config.DEFAULT_CACHE_LEADER_MODE) {
- Map<String, TSInsertTabletsReq> tabletGroup = new HashMap<>();
- for (Entry<String, Tablet> entry : tablets.entrySet()) {
- TSInsertTabletsReq request = tabletGroup
- .computeIfAbsent(entry.getKey(), k -> new TSInsertTabletsReq());
- updateTSInsertTabletsReq(request, entry.getValue(), sorted);
- }
- EndPoint endPoint;
- //TODO parallel
- for (Entry<String, TSInsertTabletsReq> entry : tabletGroup.entrySet()) {
- try {
- if ((endPoint = deviceIdToEndpoint.get(entry.getKey())) != null) {
- endPointToSessionConnection.get(endPoint).insertTablets(entry.getValue());
- } else {
- defaultSessionConnection.insertTablets(entry.getValue());
- }
- } catch (RedirectException e) {
- handleRedirection(entry.getKey(), e);
- }
- }
+ insertTabletsWithLeaderCache(tablets, sorted);
} else {
TSInsertTabletsReq request = genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted);
try {
@@ -718,6 +710,42 @@ public class Session {
}
}
+ private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets, boolean sorted) throws
+ IoTDBConnectionException, StatementExecutionException {
+ EndPoint endPoint;
+ SessionConnection connection;
+ Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
+ for (Entry<String, Tablet> entry : tablets.entrySet()) {
+ endPoint = deviceIdToEndpoint.get(entry.getKey());
+ if (endPoint != null) {
+ connection = endPointToSessionConnection.get(endPoint);
+ } else {
+ connection = defaultSessionConnection;
+ }
+ TSInsertTabletsReq request = tabletGroup
+ .computeIfAbsent(connection, k -> new TSInsertTabletsReq());
+ updateTSInsertTabletsReq(request, entry.getValue(), sorted);
+ }
+
+ //TODO parallel
+ StringBuilder errMsgBuilder = new StringBuilder();
+ for (Entry<SessionConnection, TSInsertTabletsReq> entry : tabletGroup.entrySet()) {
+ try {
+ entry.getKey().insertTablets(entry.getValue());
+ } catch (RedirectException e) {
+ for (Entry<String, EndPoint> deviceEndPointEntry : e.getDeviceEndPointMap().entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+ }
+ } catch (StatementExecutionException e) {
+ errMsgBuilder.append(e.getMessage());
+ }
+ }
+ String errMsg = errMsgBuilder.toString();
+ if (!errMsg.isEmpty()) {
+ throw new StatementExecutionException(errMsg);
+ }
+ }
+
private TSInsertTabletsReq genTSInsertTabletsReq(List<Tablet> tablets, boolean sorted)
throws BatchExecutionException {
TSInsertTabletsReq request = new TSInsertTabletsReq();
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 62ea7e1..d5ed5eb 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -66,7 +66,8 @@ public class SessionConnection {
private ZoneId zoneId;
private EndPoint endPoint;
- public SessionConnection(Session session, EndPoint endPoint, ZoneId zoneId) throws IoTDBConnectionException {
+ public SessionConnection(Session session, EndPoint endPoint, ZoneId zoneId)
+ throws IoTDBConnectionException {
this.session = session;
this.endPoint = endPoint;
this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId;
@@ -434,7 +435,7 @@ public class SessionConnection {
throws IoTDBConnectionException, StatementExecutionException, RedirectException {
request.setSessionId(sessionId);
try {
- RpcUtils.verifySuccessWithRedirection(client.insertTablets(request));
+ RpcUtils.verifySuccessWithRedirectionForInsertTablets(client.insertTablets(request), request);
} catch (TException e) {
if (reconnect()) {
try {