You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/15 12:19:05 UTC
[iotdb] 01/01: fix ci
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch stable-pp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3e056a6a89f1adf252c5d166f52eca5093d4e97a
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Apr 15 20:18:45 2022 +0800
fix ci
---
.../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 4 +-
.../apache/iotdb/db/mpp/buffer/SourceHandle.java | 2 -
.../apache/iotdb/db/mpp/buffer/StubSinkHandle.java | 6 ++-
.../apache/iotdb/db/mpp/execution/Coordinator.java | 23 +++++++++--
.../apache/iotdb/db/mpp/execution/DataDriver.java | 3 ++
.../iotdb/db/mpp/execution/QueryExecution.java | 22 ++++++++--
.../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 11 ++---
.../db/mpp/sql/analyze/ClusterSchemaFetcher.java | 3 +-
.../thrift/impl/DataNodeTSIServiceImpl.java | 48 ++++++++++++++++++----
.../iotdb/db/mpp/sql/analyze/AnalyzerTest.java | 6 ++-
.../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java | 5 ++-
.../iotdb/db/mpp/sql/plan/QueryPlannerTest.java | 6 ++-
12 files changed, 107 insertions(+), 32 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index 96802c3bd5..2934a6bf7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -108,8 +108,8 @@ public class SinkHandle implements ISinkHandle {
// TODO: (xingtanzjr)
// We temporarily make it sync instead of async to avoid EOS Event(SinkHandle close() method is
// called) is sent before NewDataBlockEvent arrived
- new SendNewDataBlockEventTask(startSequenceId, blockSizes).run();
- // executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
+ // new SendNewDataBlockEventTask(startSequenceId, blockSizes).run();
+ executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index af6e715a16..b577174702 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -212,8 +212,6 @@ public class SourceHandle implements ISourceHandle {
synchronized void setNoMoreTsBlocks(int lastSequenceId) {
this.lastSequenceId = lastSequenceId;
noMoreTsBlocks = true;
- // someone may be waiting for this blocked, so here we need to notify it
- blocked.set(null);
}
synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index dc09a1c038..7753bc3ff1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -34,6 +34,8 @@ public class StubSinkHandle implements ISinkHandle {
private final List<TsBlock> tsBlocks = new ArrayList<>();
+ private boolean closed = false;
+
@Override
public long getBufferRetainedSizeInBytes() {
return 0;
@@ -64,7 +66,7 @@ public class StubSinkHandle implements ISinkHandle {
@Override
public boolean isClosed() {
- return false;
+ return closed;
}
@Override
@@ -74,7 +76,7 @@ public class StubSinkHandle implements ISinkHandle {
@Override
public void close() {
- tsBlocks.clear();
+ closed = true;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 4bb6bfbf58..d92368cee4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
+import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
@@ -64,20 +66,33 @@ public class Coordinator {
this.scheduledExecutor = getScheduledExecutor();
}
- private IQueryExecution createQueryExecution(Statement statement, MPPQueryContext queryContext) {
+ private IQueryExecution createQueryExecution(
+ Statement statement,
+ MPPQueryContext queryContext,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
if (statement instanceof SetStorageGroupStatement) {
queryContext.setQueryType(QueryType.WRITE);
return new ConfigExecution(queryContext, statement, executor);
}
- return new QueryExecution(statement, queryContext, executor, scheduledExecutor);
+ return new QueryExecution(
+ statement, queryContext, executor, scheduledExecutor, partitionFetcher, schemaFetcher);
}
public ExecutionResult execute(
- Statement statement, QueryId queryId, SessionInfo session, String sql) {
+ Statement statement,
+ QueryId queryId,
+ SessionInfo session,
+ String sql,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
IQueryExecution execution =
createQueryExecution(
- statement, new MPPQueryContext(sql, queryId, session, getHostEndpoint()));
+ statement,
+ new MPPQueryContext(sql, queryId, session, getHostEndpoint()),
+ partitionFetcher,
+ schemaFetcher);
queryExecutionMap.put(queryId, execution);
execution.start();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index ce6a513080..88c160dd28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -155,6 +155,9 @@ public class DataDriver implements Driver {
@Override
public void close() {
+ if (closed) {
+ return;
+ }
closed = true;
try {
if (root != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 99241305d9..e7df772ad9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
@@ -70,6 +72,10 @@ public class QueryExecution implements IQueryExecution {
private final ExecutorService executor;
private final ScheduledExecutorService scheduledExecutor;
+ // TODO need to use factory to decide standalone or cluster
+ private final IPartitionFetcher partitionFetcher;
+ // TODO need to use factory to decide standalone or cluster
+ private final ISchemaFetcher schemaFetcher;
// The result of QueryExecution will be written to the DataBlockManager in current Node.
// We use this SourceHandle to fetch the TsBlock from it.
@@ -79,13 +85,17 @@ public class QueryExecution implements IQueryExecution {
Statement statement,
MPPQueryContext context,
ExecutorService executor,
- ScheduledExecutorService scheduledExecutor) {
+ ScheduledExecutorService scheduledExecutor,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.context = context;
this.planOptimizers = new ArrayList<>();
- this.analysis = analyze(statement, context);
+ this.analysis = analyze(statement, context, partitionFetcher, schemaFetcher);
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
+ this.partitionFetcher = partitionFetcher;
+ this.schemaFetcher = schemaFetcher;
// TODO: (xingtanzjr) Initialize the result handle after the DataBlockManager is merged.
// resultHandle = xxxx
@@ -110,9 +120,13 @@ public class QueryExecution implements IQueryExecution {
}
// Analyze the statement in QueryContext. Generate the analysis this query need
- private static Analysis analyze(Statement statement, MPPQueryContext context) {
+ private static Analysis analyze(
+ Statement statement,
+ MPPQueryContext context,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
// initialize the variable `analysis`
- return new Analyzer(context).analyze(statement);
+ return new Analyzer(context, partitionFetcher, schemaFetcher).analyze(statement);
}
private void schedule() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index d8fcd29210..7c783694c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -70,13 +70,14 @@ public class Analyzer {
private final MPPQueryContext context;
- // TODO need to use factory to decide standalone or cluster
- private final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance();
- // TODO need to use factory to decide standalone or cluster
- private final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
+ private final IPartitionFetcher partitionFetcher;
+ private final ISchemaFetcher schemaFetcher;
- public Analyzer(MPPQueryContext context) {
+ public Analyzer(
+ MPPQueryContext context, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
this.context = context;
+ this.partitionFetcher = partitionFetcher;
+ this.schemaFetcher = schemaFetcher;
}
public Analysis analyze(Statement statement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index ca7f1f06cb..783372eca6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -71,7 +71,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
QueryId queryId =
new QueryId(String.valueOf(SessionManager.getInstance().requestQueryId(false)));
- ExecutionResult executionResult = coordinator.execute(schemaFetchStatement, queryId, null, "");
+ ExecutionResult executionResult =
+ coordinator.execute(schemaFetchStatement, queryId, null, "", partitionFetcher, this);
// TODO: (xingtanzjr) throw exception
if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException("cannot fetch schema");
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index f3cfdf5bf2..4b86aca612 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -27,6 +27,10 @@ import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.Coordinator;
import org.apache.iotdb.db.mpp.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.sql.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
@@ -70,6 +74,10 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+ private static final IPartitionFetcher PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+
+ private static final ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
@@ -216,7 +224,13 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
QueryId id = new QueryId(String.valueOf(queryId));
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(s, id, SESSION_MANAGER.getSessionInfo(req.sessionId), statement);
+ COORDINATOR.execute(
+ s,
+ id,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ statement,
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException("");
@@ -316,7 +330,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -357,7 +373,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -398,7 +416,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -435,7 +455,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
@@ -467,7 +489,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -500,7 +524,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan,
@@ -538,7 +564,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
@@ -672,7 +700,9 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
statement,
new QueryId(String.valueOf(queryId)),
SESSION_MANAGER.getSessionInfo(req.sessionId),
- "");
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
// TODO(INSERT) do this check in analyze
// TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
index d8d1880232..4437a5551e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
@@ -48,7 +48,11 @@ public class AnalyzerTest {
private void assertAnalyzeSemanticException(String sql, String message) {
try {
- Analyzer analyzer = new Analyzer(new MPPQueryContext(new QueryId("test_query")));
+ Analyzer analyzer =
+ new Analyzer(
+ new MPPQueryContext(new QueryId("test_query")),
+ new FakePartitionFetcherImpl(),
+ new FakeSchemaFetcherImpl());
analyzer.analyze(StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()));
fail();
} catch (RuntimeException e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index 73001f41b3..94e5f23a38 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.FakePartitionFetcherImpl;
+import org.apache.iotdb.db.mpp.sql.analyze.FakeSchemaFetcherImpl;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -868,7 +870,8 @@ public class LogicalPlannerTest {
Statement statement =
StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
- Analyzer analyzer = new Analyzer(context);
+ Analyzer analyzer =
+ new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl());
Analysis analysis = analyzer.analyze(statement);
LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
planNode = planner.plan(analysis).getRootNode();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index ca4d50f542..a0b28041d4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.execution.QueryExecution;
+import org.apache.iotdb.db.mpp.sql.analyze.FakePartitionFetcherImpl;
+import org.apache.iotdb.db.mpp.sql.analyze.FakeSchemaFetcherImpl;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
@@ -50,7 +52,9 @@ public class QueryPlannerTest {
stmt,
new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), new Endpoint()),
IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"));
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
+ new FakePartitionFetcherImpl(),
+ new FakeSchemaFetcherImpl());
queryExecution.doLogicalPlan();
System.out.printf("SQL: %s%n%n", querySql);
System.out.println("===== Step 1: Logical Plan =====");