You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/10/06 08:29:13 UTC

[iotdb] branch master updated: [IOTDB-3656] mpp load supports clean up (#7526)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 935bf4297e [IOTDB-3656] mpp load supports clean up (#7526)
935bf4297e is described below

commit 935bf4297e60498913bd47d98820613d7a5afc1d
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Thu Oct 6 16:29:07 2022 +0800

    [IOTDB-3656] mpp load supports clean up (#7526)
---
 .../apache/iotdb/db/engine/StorageEngineV2.java    |  18 ++-
 .../iotdb/db/engine/load/LoadTsFileManager.java    | 123 ++++++++++++++-------
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |  38 ++++---
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  20 +++-
 4 files changed, 139 insertions(+), 60 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 3ab8bc6bbb..0033d1b963 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -746,18 +746,24 @@ public class StorageEngineV2 implements IService {
       switch (loadCommand) {
         case EXECUTE:
           if (loadTsFileManager.loadAll(uuid)) {
-            status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+            status = RpcUtils.SUCCESS_STATUS;
           } else {
             status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
-            status.setMessage(String.format("No uuid %s recorded.", uuid));
+            status.setMessage(
+                String.format(
+                    "No load TsFile uuid %s recorded for execute load command %s.",
+                    uuid, loadCommand));
           }
           break;
         case ROLLBACK:
           if (loadTsFileManager.deleteAll(uuid)) {
-            status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+            status = RpcUtils.SUCCESS_STATUS;
           } else {
             status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
-            status.setMessage(String.format("No uuid %s recorded.", uuid));
+            status.setMessage(
+                String.format(
+                    "No load TsFile uuid %s recorded for execute load command %s.",
+                    uuid, loadCommand));
           }
           break;
         default:
@@ -765,14 +771,16 @@ public class StorageEngineV2 implements IService {
           status.setMessage(String.format("Wrong load command %s.", loadCommand));
       }
     } catch (IOException e) {
+      logger.error(String.format("Execute load command %s error.", loadCommand), e);
       status.setCode(TSStatusCode.DATA_REGION_ERROR.getStatusCode());
       status.setMessage(e.getMessage());
     } catch (LoadFileException e) {
+      logger.error(String.format("Execute load command %s error.", loadCommand), e);
       status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
       status.setMessage(e.getMessage());
     }
 
-    return RpcUtils.SUCCESS_STATUS;
+    return status;
   }
 
   static class InstanceHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
index e9d72488a9..0998125863 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
@@ -20,8 +20,10 @@
 package org.apache.iotdb.db.engine.load;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
@@ -29,6 +31,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.PageException;
@@ -45,7 +48,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * {@link LoadTsFileManager} is used for dealing with {@link LoadTsFilePieceNode} and {@link
@@ -60,23 +65,50 @@ public class LoadTsFileManager {
   private final File loadDir;
 
   private final Map<String, TsFileWriterManager> uuid2WriterManager;
-  private final Map<String, Integer> dataPartition2NextTsFileIndex;
 
-  private final ReentrantLock lock;
+  private final ScheduledExecutorService cleanupExecutors;
+  private final Map<String, ScheduledFuture<?>> uuid2Future;
 
   public LoadTsFileManager() {
     this.loadDir = SystemFileFactory.INSTANCE.getFile(config.getLoadTsFileDir());
     this.uuid2WriterManager = new ConcurrentHashMap<>();
-    this.dataPartition2NextTsFileIndex = new HashMap<>();
-    this.lock = new ReentrantLock();
+    this.cleanupExecutors =
+        IoTDBThreadPoolFactory.newScheduledThreadPool(0, LoadTsFileManager.class.getName());
+    this.uuid2Future = new ConcurrentHashMap<>();
 
-    if (loadDir.delete()) {
-      logger.info(String.format("Delete origin load TsFile dir %s.", loadDir.getPath()));
+    recover();
+  }
+
+  private void recover() {
+    if (!loadDir.exists()) {
+      return;
+    }
+
+    for (File taskDir : loadDir.listFiles()) {
+      String uuid = taskDir.getName();
+      TsFileWriterManager writerManager = new TsFileWriterManager(taskDir);
+
+      uuid2WriterManager.put(uuid, writerManager);
+      writerManager.close();
+      uuid2Future.put(
+          uuid,
+          cleanupExecutors.schedule(
+              () -> forceCloseWriterManager(uuid),
+              LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND,
+              TimeUnit.SECONDS));
     }
   }
 
   public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode pieceNode, String uuid)
       throws PageException, IOException {
+    if (!uuid2WriterManager.containsKey(uuid)) {
+      uuid2Future.put(
+          uuid,
+          cleanupExecutors.schedule(
+              () -> forceCloseWriterManager(uuid),
+              LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND,
+              TimeUnit.SECONDS));
+    }
     TsFileWriterManager writerManager =
         uuid2WriterManager.computeIfAbsent(
             uuid, o -> new TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
@@ -91,9 +123,7 @@ public class LoadTsFileManager {
       return false;
     }
     uuid2WriterManager.get(uuid).loadAll();
-    uuid2WriterManager.get(uuid).close();
-    uuid2WriterManager.remove(uuid);
-    clean();
+    clean(uuid);
     return true;
   }
 
@@ -101,27 +131,26 @@ public class LoadTsFileManager {
     if (!uuid2WriterManager.containsKey(uuid)) {
       return false;
     }
-    uuid2WriterManager.get(uuid).close();
-    uuid2WriterManager.remove(uuid);
-    clean();
+    clean(uuid);
     return true;
   }
 
-  private String getNewTsFileName(String dataPartition) {
-    lock.lock();
-    try {
-      int nextIndex = dataPartition2NextTsFileIndex.getOrDefault(dataPartition, 0) + 1;
-      dataPartition2NextTsFileIndex.put(dataPartition, nextIndex);
-      return dataPartition
-          + IoTDBConstant.FILE_NAME_SEPARATOR
-          + nextIndex
-          + TsFileConstant.TSFILE_SUFFIX;
-    } finally {
-      lock.unlock();
+  private void clean(String uuid) {
+    uuid2WriterManager.get(uuid).close();
+    uuid2WriterManager.remove(uuid);
+    uuid2Future.get(uuid).cancel(true);
+    uuid2Future.remove(uuid);
+
+    if (loadDir.delete()) { // this method will check if there sub-dir in this dir.
+      logger.info(String.format("Delete load dir %s.", loadDir.getPath()));
     }
   }
 
-  private void clean() {
+  private void forceCloseWriterManager(String uuid) {
+    uuid2WriterManager.get(uuid).close();
+    uuid2WriterManager.remove(uuid);
+    uuid2Future.remove(uuid);
+
     if (loadDir.delete()) { // this method will check if there sub-dir in this dir.
       logger.info(String.format("Delete load dir %s.", loadDir.getPath()));
     }
@@ -131,28 +160,32 @@ public class LoadTsFileManager {
     private final File taskDir;
     private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
     private Map<DataPartitionInfo, String> dataPartition2LastDevice;
+    private boolean isClosed;
 
     private TsFileWriterManager(File taskDir) {
       this.taskDir = taskDir;
       this.dataPartition2Writer = new HashMap<>();
       this.dataPartition2LastDevice = new HashMap<>();
+      this.isClosed = false;
 
       clearDir(taskDir);
     }
 
     private void clearDir(File dir) {
-      if (dir.delete()) {
-        logger.info(String.format("Delete origin load TsFile dir %s.", dir.getPath()));
-      }
-      if (!dir.mkdirs()) {
-        logger.warn(String.format("load TsFile dir %s can not be created.", dir.getPath()));
+      FileUtils.deleteDirectory(dir);
+      if (dir.mkdirs()) {
+        logger.info(String.format("Load TsFile dir %s is created.", dir.getPath()));
       }
     }
 
     private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) throws IOException {
+      if (isClosed) {
+        throw new IOException(String.format("%s TsFileWriterManager has been closed.", taskDir));
+      }
       if (!dataPartition2Writer.containsKey(partitionInfo)) {
         File newTsFile =
-            SystemFileFactory.INSTANCE.getFile(taskDir, getNewTsFileName(partitionInfo.toString()));
+            SystemFileFactory.INSTANCE.getFile(
+                taskDir, partitionInfo.toString() + TsFileConstant.TSFILE_SUFFIX);
         if (!newTsFile.createNewFile()) {
           logger.error(String.format("Can not create TsFile %s for writing.", newTsFile.getPath()));
           return;
@@ -172,6 +205,9 @@ public class LoadTsFileManager {
     }
 
     private void loadAll() throws IOException, LoadFileException {
+      if (isClosed) {
+        throw new IOException(String.format("%s TsFileWriterManager has been closed.", taskDir));
+      }
       for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) {
         TsFileIOWriter writer = entry.getValue();
         if (writer.isWritingChunkGroup()) {
@@ -199,18 +235,31 @@ public class LoadTsFileManager {
       return tsFileResource;
     }
 
-    private void close() throws IOException {
-      for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) {
-        TsFileIOWriter writer = entry.getValue();
-        if (writer.canWrite()) {
-          entry.getValue().close();
+    private void close() {
+      if (dataPartition2Writer != null) {
+        for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) {
+          try {
+            TsFileIOWriter writer = entry.getValue();
+            if (writer.canWrite()) {
+              writer.close();
+            }
+            if (!writer.getFile().delete()) {
+              logger.warn(String.format("Delete File %s error.", writer.getFile()));
+            }
+          } catch (IOException e) {
+            logger.warn(
+                String.format(
+                    "Close TsFileIOWriter %s error.", entry.getValue().getFile().getPath()),
+                e);
+          }
         }
       }
       if (!taskDir.delete()) {
-        logger.warn(String.format("Can not delete load uuid dir %s.", taskDir.getPath()));
+        logger.warn(String.format("Can not delete load dir %s.", taskDir.getPath()));
       }
       dataPartition2Writer = null;
       dataPartition2LastDevice = null;
+      isClosed = true;
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 1b76194d04..a84a914d6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -53,6 +54,7 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
@@ -65,12 +67,15 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
   private final int localhostInternalPort;
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       internalServiceClientManager;
+  private final ExecutorService executor;
 
   public LoadTsFileDispatcherImpl(
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.internalServiceClientManager = internalServiceClientManager;
     this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
     this.localhostInternalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+    this.executor =
+        IoTDBThreadPoolFactory.newCachedThreadPool(LoadTsFileDispatcherImpl.class.getName());
   }
 
   public void setUuid(String uuid) {
@@ -79,21 +84,24 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
 
   @Override
   public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) {
-    for (FragmentInstance instance : instances) {
-      try (SetThreadName threadName =
-          new SetThreadName(LoadTsFileScheduler.class.getName() + instance.getId().getFullId())) {
-        dispatchOneInstance(instance);
-      } catch (FragmentInstanceDispatchException e) {
-        return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
-      } catch (Throwable t) {
-        logger.error("cannot dispatch FI for load operation", t);
-        return immediateFuture(
-            new FragInstanceDispatchResult(
-                RpcUtils.getStatus(
-                    TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
-      }
-    }
-    return immediateFuture(new FragInstanceDispatchResult(true));
+    return executor.submit(
+        () -> {
+          for (FragmentInstance instance : instances) {
+            try (SetThreadName threadName =
+                new SetThreadName(
+                    LoadTsFileScheduler.class.getName() + instance.getId().getFullId())) {
+              dispatchOneInstance(instance);
+            } catch (FragmentInstanceDispatchException e) {
+              return new FragInstanceDispatchResult(e.getFailureStatus());
+            } catch (Throwable t) {
+              logger.error("cannot dispatch FI for load operation", t);
+              return new FragInstanceDispatchResult(
+                  RpcUtils.getStatus(
+                      TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()));
+            }
+          }
+          return new FragInstanceDispatchResult(true);
+        });
   }
 
   private void dispatchOneInstance(FragmentInstance instance)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index 2a795780ed..dd403d46ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -51,8 +51,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * {@link LoadTsFileScheduler} is used for scheduling {@link LoadSingleTsFileNode} and {@link
@@ -62,10 +65,12 @@ import java.util.concurrent.Future;
  * href="https://apache-iotdb.feishu.cn/docx/doxcnyBYWzek8ksSEU6obZMpYLe">...</a>;
  */
 public class LoadTsFileScheduler implements IScheduler {
+  public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 5184000L; // one day
+
   private static final Logger logger = LoggerFactory.getLogger(LoadTsFileScheduler.class);
 
   private final MPPQueryContext queryContext;
-  private QueryStateMachine stateMachine;
+  private final QueryStateMachine stateMachine;
   private LoadTsFileDispatcherImpl dispatcher;
   private List<LoadSingleTsFileNode> tsFileNodeList;
   private PlanFragmentId fragmentId;
@@ -144,7 +149,9 @@ public class LoadTsFileScheduler implements IScheduler {
             dispatcher.dispatch(Collections.singletonList(instance));
 
         try {
-          FragInstanceDispatchResult result = dispatchResultFuture.get();
+          FragInstanceDispatchResult result =
+              dispatchResultFuture.get(
+                  LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND, TimeUnit.SECONDS);
           if (!result.isSuccessful()) {
             // TODO: retry.
             logger.error(
@@ -166,13 +173,20 @@ public class LoadTsFileScheduler implements IScheduler {
             stateMachine.transitionToFailed(result.getFailureStatus()); // TODO: record more status
             return false;
           }
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (InterruptedException | ExecutionException | CancellationException e) {
           if (e instanceof InterruptedException) {
             Thread.currentThread().interrupt();
           }
           logger.warn("Interrupt or Execution error.", e);
           stateMachine.transitionToFailed(e);
           return false;
+        } catch (TimeoutException e) {
+          dispatchResultFuture.cancel(true);
+          logger.error(
+              String.format("Wait for loading %s time out.", LoadTsFilePieceNode.class.getName()),
+              e);
+          stateMachine.transitionToFailed(e);
+          return false;
         }
       }
     }