You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/02/21 03:57:05 UTC
[iotdb] branch master updated: [IOTDB-2557] Fix non-data-read QueryContext in getAllMeasurementSchema (#5072)
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b4ca805 [IOTDB-2557] Fix non-data-read QueryContext in getAllMeasurementSchema (#5072)
b4ca805 is described below
commit b4ca805b5799ce072f1647d05f660fe839ec179d
Author: BaiJian <er...@hotmail.com>
AuthorDate: Mon Feb 21 11:56:27 2022 +0800
[IOTDB-2557] Fix non-data-read QueryContext in getAllMeasurementSchema (#5072)
---
.../cluster/client/sync/SyncClientAdaptor.java | 11 ++---
.../apache/iotdb/cluster/metadata/CMManager.java | 51 ++++++++++++++--------
.../iotdb/cluster/query/LocalQueryExecutor.java | 13 +++---
.../cluster/server/service/DataAsyncService.java | 5 ++-
.../server/service/DataGroupServiceImpls.java | 14 +++---
.../cluster/server/service/DataSyncService.java | 6 +--
.../cluster/client/sync/SyncClientAdaptorTest.java | 24 +++++++---
.../iotdb/cluster/common/TestAsyncDataClient.java | 7 +--
thrift-cluster/src/main/thrift/cluster.thrift | 9 +++-
9 files changed, 85 insertions(+), 55 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 7f16551..796c680 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.MeasurementSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
@@ -55,7 +56,6 @@ import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.SerializeUtils;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
@@ -216,18 +216,13 @@ public class SyncClientAdaptor {
}
public static ByteBuffer getAllMeasurementSchema(
- AsyncDataClient client, RaftNode header, ShowTimeSeriesPlan plan)
+ AsyncDataClient client, MeasurementSchemaRequest request)
throws IOException, InterruptedException, TException {
GetTimeseriesSchemaHandler handler = new GetTimeseriesSchemaHandler();
AtomicReference<ByteBuffer> response = new AtomicReference<>(null);
handler.setResponse(response);
handler.setContact(client.getNode());
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
- plan.serialize(dataOutputStream);
-
- client.getAllMeasurementSchema(
- header, ByteBuffer.wrap(byteArrayOutputStream.toByteArray()), handler);
+ client.getAllMeasurementSchema(request, handler);
synchronized (response) {
if (response.get() == null) {
response.wait(ClusterConstant.getReadOperationTimeoutMS());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index a338100..3402a3e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -29,8 +29,10 @@ import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
+import org.apache.iotdb.cluster.rpc.thrift.MeasurementSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.server.handlers.caller.ShowTimeSeriesHandler;
@@ -1546,7 +1548,7 @@ public class CMManager extends MManager {
if (group.contains(metaGroupMember.getThisNode())) {
showLocalTimeseries(group, plan, context, handler);
} else {
- showRemoteTimeseries(group, plan, handler);
+ showRemoteTimeseries(group, plan, context, handler);
}
}
@@ -1593,11 +1595,14 @@ public class CMManager extends MManager {
}
private void showRemoteTimeseries(
- PartitionGroup group, ShowTimeSeriesPlan plan, ShowTimeSeriesHandler handler) {
+ PartitionGroup group,
+ ShowTimeSeriesPlan plan,
+ QueryContext context,
+ ShowTimeSeriesHandler handler) {
ByteBuffer resultBinary = null;
for (Node node : group) {
try {
- resultBinary = showRemoteTimeseries(node, group, plan);
+ resultBinary = showRemoteTimeseries(context, node, group, plan);
if (resultBinary != null) {
break;
}
@@ -1606,6 +1611,9 @@ public class CMManager extends MManager {
} catch (InterruptedException e) {
logger.error("Interrupted when getting timeseries schemas in node {}.", node, e);
Thread.currentThread().interrupt();
+ } finally {
+ // record the queried node to release resources later
+ ((RemoteQueryContext) context).registerRemoteNode(node, group.getHeader());
}
}
@@ -1653,32 +1661,41 @@ public class CMManager extends MManager {
}
}
- private ByteBuffer showRemoteTimeseries(Node node, PartitionGroup group, ShowTimeSeriesPlan plan)
+ private ByteBuffer showRemoteTimeseries(
+ QueryContext context, Node node, PartitionGroup group, ShowTimeSeriesPlan plan)
throws IOException, TException, InterruptedException {
ByteBuffer resultBinary;
+ // prepare request
+ MeasurementSchemaRequest request;
+ try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+ plan.serialize(dataOutputStream);
+ request =
+ new MeasurementSchemaRequest(
+ context.getQueryId(),
+ group.getHeader(),
+ node,
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+ }
+
+ // execute remote query
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
- resultBinary = SyncClientAdaptor.getAllMeasurementSchema(client, group.getHeader(), plan);
+ resultBinary = SyncClientAdaptor.getAllMeasurementSchema(client, request);
} else {
SyncDataClient syncDataClient = null;
- try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+ try {
syncDataClient =
ClusterIoTDB.getInstance()
.getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
- try {
- plan.serialize(dataOutputStream);
- resultBinary =
- syncDataClient.getAllMeasurementSchema(
- group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.close();
- throw e;
- }
+ resultBinary = syncDataClient.getAllMeasurementSchema(request);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ throw e;
} finally {
if (syncDataClient != null) {
syncDataClient.returnSelf();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 8c4cd69..5589d7a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.query.reader.mult.IMultBatchReader;
import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.MeasurementSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
@@ -53,7 +54,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.query.dataset.groupby.GroupByExecutor;
@@ -602,16 +602,15 @@ public class LocalQueryExecutor {
}
}
- public ByteBuffer getAllMeasurementSchema(ByteBuffer planBuffer)
+ public ByteBuffer getAllMeasurementSchema(MeasurementSchemaRequest request)
throws CheckConsistencyException, IOException, MetadataException {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
- ShowTimeSeriesPlan plan = (ShowTimeSeriesPlan) PhysicalPlan.Factory.create(planBuffer);
+ ShowTimeSeriesPlan plan = (ShowTimeSeriesPlan) PhysicalPlan.Factory.create(request.planBinary);
List<ShowTimeSeriesResult> allTimeseriesSchema;
- allTimeseriesSchema =
- getCMManager()
- .showLocalTimeseries(
- plan, new QueryContext(SessionManager.getInstance().requestQueryId(false)));
+ RemoteQueryContext queryContext =
+ queryManager.getQueryContext(request.getRequester(), request.getQueryId());
+ allTimeseriesSchema = getCMManager().showLocalTimeseries(plan, queryContext);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index eaa3ca8..594f983 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.MeasurementSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
@@ -364,10 +365,10 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
@Override
public void getAllMeasurementSchema(
- RaftNode header, ByteBuffer planBinary, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ MeasurementSchemaRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
try {
resultHandler.onComplete(
- dataGroupMember.getLocalQueryExecutor().getAllMeasurementSchema(planBinary));
+ dataGroupMember.getLocalQueryExecutor().getAllMeasurementSchema(request));
} catch (CheckConsistencyException | IOException | MetadataException e) {
resultHandler.onError(e);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
index 9b15dc5..4752718 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.MeasurementSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
@@ -341,12 +342,12 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void getAllMeasurementSchema(
- RaftNode header, ByteBuffer planBytes, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ MeasurementSchemaRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
DataAsyncService service =
DataGroupEngine.getInstance()
- .getDataAsyncService(header, resultHandler, "Get all measurement schema");
+ .getDataAsyncService(request.getHeader(), resultHandler, "Get all measurement schema");
if (service != null) {
- service.getAllMeasurementSchema(header, planBytes, resultHandler);
+ service.getAllMeasurementSchema(request, resultHandler);
}
}
@@ -551,11 +552,10 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
}
@Override
- public ByteBuffer getAllMeasurementSchema(RaftNode header, ByteBuffer planBinary)
- throws TException {
+ public ByteBuffer getAllMeasurementSchema(MeasurementSchemaRequest request) throws TException {
return DataGroupEngine.getInstance()
- .getDataSyncService(header)
- .getAllMeasurementSchema(header, planBinary);
+ .getDataSyncService(request.getHeader())
+ .getAllMeasurementSchema(request);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 6d15e48..0777241 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.MeasurementSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
@@ -343,10 +344,9 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
}
@Override
- public ByteBuffer getAllMeasurementSchema(RaftNode header, ByteBuffer planBinary)
- throws TException {
+ public ByteBuffer getAllMeasurementSchema(MeasurementSchemaRequest request) throws TException {
try {
- return dataGroupMember.getLocalQueryExecutor().getAllMeasurementSchema(planBinary);
+ return dataGroupMember.getLocalQueryExecutor().getAllMeasurementSchema(request);
} catch (CheckConsistencyException | IOException | MetadataException e) {
throw new TException(e);
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index eb1ce23..f13f878 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.MeasurementSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
@@ -60,6 +61,8 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.junit.Before;
import org.junit.Test;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -195,9 +198,7 @@ public class SyncClientAdaptorTest {
@Override
public void getAllMeasurementSchema(
- RaftNode header,
- ByteBuffer planBinary,
- AsyncMethodCallback<ByteBuffer> resultHandler) {
+ MeasurementSchemaRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
resultHandler.onComplete(getAllMeasurementSchemaResult);
}
@@ -382,12 +383,21 @@ public class SyncClientAdaptorTest {
assertEquals(
new HashSet<>(Arrays.asList("1", "2", "3")),
SyncClientAdaptor.getNextChildren(dataClient, TestUtils.getRaftNode(0, 0), "root"));
+
+ MeasurementSchemaRequest request;
+ try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+ new ShowTimeSeriesPlan(new PartialPath("root")).serialize(dataOutputStream);
+ request =
+ new MeasurementSchemaRequest(
+ 0,
+ TestUtils.getRaftNode(0, 0),
+ TestUtils.getNode(1),
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+ }
assertEquals(
getAllMeasurementSchemaResult,
- SyncClientAdaptor.getAllMeasurementSchema(
- dataClient,
- TestUtils.getRaftNode(0, 0),
- new ShowTimeSeriesPlan(new PartialPath("root"))));
+ SyncClientAdaptor.getAllMeasurementSchema(dataClient, request));
assertEquals(
measurementSchemas,
SyncClientAdaptor.pullMeasurementSchema(dataClient, new PullSchemaRequest()));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 92f8ae9..a06fa49 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.MeasurementSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
@@ -282,11 +283,11 @@ public class TestAsyncDataClient extends AsyncDataClient {
@Override
public void getAllMeasurementSchema(
- RaftNode header, ByteBuffer planBinary, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ MeasurementSchemaRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
new Thread(
() -> {
- new DataAsyncService(dataGroupMemberMap.get(header))
- .getAllMeasurementSchema(header, planBinary, resultHandler);
+ new DataAsyncService(dataGroupMemberMap.get(request.getHeader()))
+ .getAllMeasurementSchema(request, resultHandler);
})
.start();
}
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index 9a981e0..669bf21 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -269,6 +269,13 @@ struct GetAllPathsResult {
4: required list<bool> underAlignedEntity
}
+struct MeasurementSchemaRequest {
+ 1: required long queryId
+ 2: required RaftNode header
+ 3: required Node requester
+ 4: required binary planBinary
+}
+
service RaftService {
/**
@@ -419,7 +426,7 @@ service TSDataService extends RaftService {
set<string> getChildNodePathInNextLevel(1: RaftNode header, 2: string path)
- binary getAllMeasurementSchema(1: RaftNode header, 2: binary planBinary)
+ binary getAllMeasurementSchema(1:MeasurementSchemaRequest request)
list<binary> getAggrResult(1:GetAggrResultRequest request)