You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/03/08 10:06:10 UTC

[iotdb] branch ReduceContextSize1.1 created (now 2156791420)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch ReduceContextSize1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 2156791420 Set some runtime fields in FIConetxt to null when the FI is done

This branch includes the following new commits:

     new 2156791420 Set some runtime fields in FIConetxt to null when the FI is done

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Set some runtime fields in FIConetxt to null when the FI is done

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ReduceContextSize1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 21567914202fded89f3c8a63a4586babb96beb74
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Mar 8 18:03:17 2023 +0800

    Set some runtime fields in FIConetxt to null when the FI is done
    
    (cherry picked from commit 863bc15c189ae3bd27eb79c9783a05d3c64c1367)
---
 .../iotdb/db/mpp/execution/driver/DataDriver.java   |  6 ++++++
 .../execution/fragment/FragmentInstanceContext.java |  8 ++++++--
 .../fragment/FragmentInstanceExecution.java         |  4 ++++
 .../execution/fragment/FragmentInstanceManager.java | 21 ++++++++++-----------
 4 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index 287f034954..d7378a67b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -71,6 +71,12 @@ public class DataDriver extends Driver {
           ((DataDriverContext) driverContext).getSourceOperators();
       if (sourceOperators != null && !sourceOperators.isEmpty()) {
         QueryDataSource dataSource = initQueryDataSource();
+        if (dataSource == null) {
+          // if this driver is being initialized, meanwhile the whole FI was aborted or cancelled
+          // for some reasons, we may get null QueryDataSource here.
+          // And it's safe for us to throw this exception here in such case.
+          throw new IllegalStateException("QueryDataSource should never be null!");
+        }
         sourceOperators.forEach(
             sourceOperator -> {
               // construct QueryDataSource for source operator
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 4d6c80233a..f810bc1ddb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -55,7 +55,7 @@ public class FragmentInstanceContext extends QueryContext {
 
   private IDataRegionForQuery dataRegion;
   private Filter timeFilter;
-  List<PartialPath> sourcePaths;
+  private List<PartialPath> sourcePaths;
   // Shared by all scan operators in this fragment instance to avoid memory problem
   private QueryDataSource sharedQueryDataSource;
   /** closed tsfile used in this fragment instance */
@@ -354,7 +354,7 @@ public class FragmentInstanceContext extends QueryContext {
    * All file paths used by this fragment instance must be cleared and thus the usage reference must
    * be decreased.
    */
-  protected void releaseResource() {
+  protected synchronized void releaseResource() {
     for (TsFileResource tsFile : closedFilePaths) {
       FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
     }
@@ -363,5 +363,9 @@ public class FragmentInstanceContext extends QueryContext {
       FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, false);
     }
     unClosedFilePaths = null;
+    dataRegion = null;
+    timeFilter = null;
+    sourcePaths = null;
+    sharedQueryDataSource = null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 2415bed270..f9c17c9963 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -99,6 +99,10 @@ public class FragmentInstanceExecution {
         context.getFailureInfoList());
   }
 
+  public long getStartTime() {
+    return context.getStartTime();
+  }
+
   public FragmentInstanceStateMachine getStateMachine() {
     return stateMachine;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 4c4dd4187b..6e5390fdc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -92,12 +92,12 @@ public class FragmentInstanceManager {
     this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);
 
     ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-        instanceManagementExecutor, this::removeOldInstances, 200, 200, TimeUnit.MILLISECONDS);
+        instanceManagementExecutor, this::removeOldInstances, 2000, 2000, TimeUnit.MILLISECONDS);
     ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
         instanceManagementExecutor,
         this::cancelTimeoutFlushingInstances,
-        200,
-        200,
+        2000,
+        2000,
         TimeUnit.MILLISECONDS);
 
     this.intoOperationExecutor =
@@ -297,14 +297,13 @@ public class FragmentInstanceManager {
 
   private void cancelTimeoutFlushingInstances() {
     long now = System.currentTimeMillis();
-    instanceContext.entrySet().stream()
-        .filter(
-            entry -> {
-              FragmentInstanceContext context = entry.getValue();
-              return context.getStateMachine().getState() == FragmentInstanceState.FLUSHING
-                  && (now - context.getStartTime()) > QUERY_TIMEOUT_MS;
-            })
-        .forEach(entry -> entry.getValue().failed(new TimeoutException()));
+    instanceExecution.forEach(
+        (key, execution) -> {
+          if (execution.getStateMachine().getState() == FragmentInstanceState.FLUSHING
+              && (now - execution.getStartTime()) > QUERY_TIMEOUT_MS) {
+            execution.getStateMachine().failed(new TimeoutException());
+          }
+        });
   }
 
   public ExecutorService getIntoOperationExecutor() {