You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/03/08 07:06:26 UTC
[iotdb] 01/01: Set some runtime fields in FIConetxt to null when the FI is done
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ReduceContextSize
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4bbdce7326101e23c5edc89f3edcdb69035e71f9
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Mar 8 15:03:57 2023 +0800
Set some runtime fields in FIConetxt to null when the FI is done
---
.../iotdb/db/mpp/execution/driver/DataDriver.java | 6 ++++++
.../execution/fragment/FragmentInstanceContext.java | 8 ++++++--
.../fragment/FragmentInstanceExecution.java | 4 ++++
.../execution/fragment/FragmentInstanceManager.java | 21 ++++++++++-----------
4 files changed, 26 insertions(+), 13 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index 287f034954..d7378a67b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -71,6 +71,12 @@ public class DataDriver extends Driver {
((DataDriverContext) driverContext).getSourceOperators();
if (sourceOperators != null && !sourceOperators.isEmpty()) {
QueryDataSource dataSource = initQueryDataSource();
+ if (dataSource == null) {
+ // if this driver is being initialized, meanwhile the whole FI was aborted or cancelled
+ // for some reasons, we may get null QueryDataSource here.
+ // And it's safe for us to throw this exception here in such case.
+ throw new IllegalStateException("QueryDataSource should never be null!");
+ }
sourceOperators.forEach(
sourceOperator -> {
// construct QueryDataSource for source operator
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 4d6c80233a..f810bc1ddb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -55,7 +55,7 @@ public class FragmentInstanceContext extends QueryContext {
private IDataRegionForQuery dataRegion;
private Filter timeFilter;
- List<PartialPath> sourcePaths;
+ private List<PartialPath> sourcePaths;
// Shared by all scan operators in this fragment instance to avoid memory problem
private QueryDataSource sharedQueryDataSource;
/** closed tsfile used in this fragment instance */
@@ -354,7 +354,7 @@ public class FragmentInstanceContext extends QueryContext {
* All file paths used by this fragment instance must be cleared and thus the usage reference must
* be decreased.
*/
- protected void releaseResource() {
+ protected synchronized void releaseResource() {
for (TsFileResource tsFile : closedFilePaths) {
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
}
@@ -363,5 +363,9 @@ public class FragmentInstanceContext extends QueryContext {
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, false);
}
unClosedFilePaths = null;
+ dataRegion = null;
+ timeFilter = null;
+ sourcePaths = null;
+ sharedQueryDataSource = null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 2415bed270..f9c17c9963 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -99,6 +99,10 @@ public class FragmentInstanceExecution {
context.getFailureInfoList());
}
+ public long getStartTime() {
+ return context.getStartTime();
+ }
+
public FragmentInstanceStateMachine getStateMachine() {
return stateMachine;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 4c4dd4187b..6e5390fdc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -92,12 +92,12 @@ public class FragmentInstanceManager {
this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- instanceManagementExecutor, this::removeOldInstances, 200, 200, TimeUnit.MILLISECONDS);
+ instanceManagementExecutor, this::removeOldInstances, 2000, 2000, TimeUnit.MILLISECONDS);
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
instanceManagementExecutor,
this::cancelTimeoutFlushingInstances,
- 200,
- 200,
+ 2000,
+ 2000,
TimeUnit.MILLISECONDS);
this.intoOperationExecutor =
@@ -297,14 +297,13 @@ public class FragmentInstanceManager {
private void cancelTimeoutFlushingInstances() {
long now = System.currentTimeMillis();
- instanceContext.entrySet().stream()
- .filter(
- entry -> {
- FragmentInstanceContext context = entry.getValue();
- return context.getStateMachine().getState() == FragmentInstanceState.FLUSHING
- && (now - context.getStartTime()) > QUERY_TIMEOUT_MS;
- })
- .forEach(entry -> entry.getValue().failed(new TimeoutException()));
+ instanceExecution.forEach(
+ (key, execution) -> {
+ if (execution.getStateMachine().getState() == FragmentInstanceState.FLUSHING
+ && (now - execution.getStartTime()) > QUERY_TIMEOUT_MS) {
+ execution.getStateMachine().failed(new TimeoutException());
+ }
+ });
}
public ExecutorService getIntoOperationExecutor() {