You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/10/06 08:29:13 UTC
[iotdb] branch master updated: [IOTDB-3656] mpp load supports clean up (#7526)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 935bf4297e [IOTDB-3656] mpp load supports clean up (#7526)
935bf4297e is described below
commit 935bf4297e60498913bd47d98820613d7a5afc1d
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Thu Oct 6 16:29:07 2022 +0800
[IOTDB-3656] mpp load supports clean up (#7526)
---
.../apache/iotdb/db/engine/StorageEngineV2.java | 18 ++-
.../iotdb/db/engine/load/LoadTsFileManager.java | 123 ++++++++++++++-------
.../scheduler/load/LoadTsFileDispatcherImpl.java | 38 ++++---
.../plan/scheduler/load/LoadTsFileScheduler.java | 20 +++-
4 files changed, 139 insertions(+), 60 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 3ab8bc6bbb..0033d1b963 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -746,18 +746,24 @@ public class StorageEngineV2 implements IService {
switch (loadCommand) {
case EXECUTE:
if (loadTsFileManager.loadAll(uuid)) {
- status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ status = RpcUtils.SUCCESS_STATUS;
} else {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
- status.setMessage(String.format("No uuid %s recorded.", uuid));
+ status.setMessage(
+ String.format(
+ "No load TsFile uuid %s recorded for execute load command %s.",
+ uuid, loadCommand));
}
break;
case ROLLBACK:
if (loadTsFileManager.deleteAll(uuid)) {
- status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ status = RpcUtils.SUCCESS_STATUS;
} else {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
- status.setMessage(String.format("No uuid %s recorded.", uuid));
+ status.setMessage(
+ String.format(
+ "No load TsFile uuid %s recorded for execute load command %s.",
+ uuid, loadCommand));
}
break;
default:
@@ -765,14 +771,16 @@ public class StorageEngineV2 implements IService {
status.setMessage(String.format("Wrong load command %s.", loadCommand));
}
} catch (IOException e) {
+ logger.error(String.format("Execute load command %s error.", loadCommand), e);
status.setCode(TSStatusCode.DATA_REGION_ERROR.getStatusCode());
status.setMessage(e.getMessage());
} catch (LoadFileException e) {
+ logger.error(String.format("Execute load command %s error.", loadCommand), e);
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage(e.getMessage());
}
- return RpcUtils.SUCCESS_STATUS;
+ return status;
}
static class InstanceHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
index e9d72488a9..0998125863 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
@@ -20,8 +20,10 @@
package org.apache.iotdb.db.engine.load;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
@@ -29,6 +31,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.PageException;
@@ -45,7 +48,9 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
/**
* {@link LoadTsFileManager} is used for dealing with {@link LoadTsFilePieceNode} and {@link
@@ -60,23 +65,50 @@ public class LoadTsFileManager {
private final File loadDir;
private final Map<String, TsFileWriterManager> uuid2WriterManager;
- private final Map<String, Integer> dataPartition2NextTsFileIndex;
- private final ReentrantLock lock;
+ private final ScheduledExecutorService cleanupExecutors;
+ private final Map<String, ScheduledFuture<?>> uuid2Future;
public LoadTsFileManager() {
this.loadDir = SystemFileFactory.INSTANCE.getFile(config.getLoadTsFileDir());
this.uuid2WriterManager = new ConcurrentHashMap<>();
- this.dataPartition2NextTsFileIndex = new HashMap<>();
- this.lock = new ReentrantLock();
+ this.cleanupExecutors =
+ IoTDBThreadPoolFactory.newScheduledThreadPool(0, LoadTsFileManager.class.getName());
+ this.uuid2Future = new ConcurrentHashMap<>();
- if (loadDir.delete()) {
- logger.info(String.format("Delete origin load TsFile dir %s.", loadDir.getPath()));
+ recover();
+ }
+
+ private void recover() {
+ if (!loadDir.exists()) {
+ return;
+ }
+
+ for (File taskDir : loadDir.listFiles()) {
+ String uuid = taskDir.getName();
+ TsFileWriterManager writerManager = new TsFileWriterManager(taskDir);
+
+ uuid2WriterManager.put(uuid, writerManager);
+ writerManager.close();
+ uuid2Future.put(
+ uuid,
+ cleanupExecutors.schedule(
+ () -> forceCloseWriterManager(uuid),
+ LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND,
+ TimeUnit.SECONDS));
}
}
public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode pieceNode, String uuid)
throws PageException, IOException {
+ if (!uuid2WriterManager.containsKey(uuid)) {
+ uuid2Future.put(
+ uuid,
+ cleanupExecutors.schedule(
+ () -> forceCloseWriterManager(uuid),
+ LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND,
+ TimeUnit.SECONDS));
+ }
TsFileWriterManager writerManager =
uuid2WriterManager.computeIfAbsent(
uuid, o -> new TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
@@ -91,9 +123,7 @@ public class LoadTsFileManager {
return false;
}
uuid2WriterManager.get(uuid).loadAll();
- uuid2WriterManager.get(uuid).close();
- uuid2WriterManager.remove(uuid);
- clean();
+ clean(uuid);
return true;
}
@@ -101,27 +131,26 @@ public class LoadTsFileManager {
if (!uuid2WriterManager.containsKey(uuid)) {
return false;
}
- uuid2WriterManager.get(uuid).close();
- uuid2WriterManager.remove(uuid);
- clean();
+ clean(uuid);
return true;
}
- private String getNewTsFileName(String dataPartition) {
- lock.lock();
- try {
- int nextIndex = dataPartition2NextTsFileIndex.getOrDefault(dataPartition, 0) + 1;
- dataPartition2NextTsFileIndex.put(dataPartition, nextIndex);
- return dataPartition
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + nextIndex
- + TsFileConstant.TSFILE_SUFFIX;
- } finally {
- lock.unlock();
+ private void clean(String uuid) {
+ uuid2WriterManager.get(uuid).close();
+ uuid2WriterManager.remove(uuid);
+ uuid2Future.get(uuid).cancel(true);
+ uuid2Future.remove(uuid);
+
+ if (loadDir.delete()) { // this method will check if there sub-dir in this dir.
+ logger.info(String.format("Delete load dir %s.", loadDir.getPath()));
}
}
- private void clean() {
+ private void forceCloseWriterManager(String uuid) {
+ uuid2WriterManager.get(uuid).close();
+ uuid2WriterManager.remove(uuid);
+ uuid2Future.remove(uuid);
+
if (loadDir.delete()) { // this method will check if there sub-dir in this dir.
logger.info(String.format("Delete load dir %s.", loadDir.getPath()));
}
@@ -131,28 +160,32 @@ public class LoadTsFileManager {
private final File taskDir;
private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
private Map<DataPartitionInfo, String> dataPartition2LastDevice;
+ private boolean isClosed;
private TsFileWriterManager(File taskDir) {
this.taskDir = taskDir;
this.dataPartition2Writer = new HashMap<>();
this.dataPartition2LastDevice = new HashMap<>();
+ this.isClosed = false;
clearDir(taskDir);
}
private void clearDir(File dir) {
- if (dir.delete()) {
- logger.info(String.format("Delete origin load TsFile dir %s.", dir.getPath()));
- }
- if (!dir.mkdirs()) {
- logger.warn(String.format("load TsFile dir %s can not be created.", dir.getPath()));
+ FileUtils.deleteDirectory(dir);
+ if (dir.mkdirs()) {
+ logger.info(String.format("Load TsFile dir %s is created.", dir.getPath()));
}
}
private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) throws IOException {
+ if (isClosed) {
+ throw new IOException(String.format("%s TsFileWriterManager has been closed.", taskDir));
+ }
if (!dataPartition2Writer.containsKey(partitionInfo)) {
File newTsFile =
- SystemFileFactory.INSTANCE.getFile(taskDir, getNewTsFileName(partitionInfo.toString()));
+ SystemFileFactory.INSTANCE.getFile(
+ taskDir, partitionInfo.toString() + TsFileConstant.TSFILE_SUFFIX);
if (!newTsFile.createNewFile()) {
logger.error(String.format("Can not create TsFile %s for writing.", newTsFile.getPath()));
return;
@@ -172,6 +205,9 @@ public class LoadTsFileManager {
}
private void loadAll() throws IOException, LoadFileException {
+ if (isClosed) {
+ throw new IOException(String.format("%s TsFileWriterManager has been closed.", taskDir));
+ }
for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) {
TsFileIOWriter writer = entry.getValue();
if (writer.isWritingChunkGroup()) {
@@ -199,18 +235,31 @@ public class LoadTsFileManager {
return tsFileResource;
}
- private void close() throws IOException {
- for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) {
- TsFileIOWriter writer = entry.getValue();
- if (writer.canWrite()) {
- entry.getValue().close();
+ private void close() {
+ if (dataPartition2Writer != null) {
+ for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) {
+ try {
+ TsFileIOWriter writer = entry.getValue();
+ if (writer.canWrite()) {
+ writer.close();
+ }
+ if (!writer.getFile().delete()) {
+ logger.warn(String.format("Delete File %s error.", writer.getFile()));
+ }
+ } catch (IOException e) {
+ logger.warn(
+ String.format(
+ "Close TsFileIOWriter %s error.", entry.getValue().getFile().getPath()),
+ e);
+ }
}
}
if (!taskDir.delete()) {
- logger.warn(String.format("Can not delete load uuid dir %s.", taskDir.getPath()));
+ logger.warn(String.format("Can not delete load dir %s.", taskDir.getPath()));
}
dataPartition2Writer = null;
dataPartition2LastDevice = null;
+ isClosed = true;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 1b76194d04..a84a914d6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -53,6 +54,7 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import static com.google.common.util.concurrent.Futures.immediateFuture;
@@ -65,12 +67,15 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
private final int localhostInternalPort;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager;
+ private final ExecutorService executor;
public LoadTsFileDispatcherImpl(
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
this.internalServiceClientManager = internalServiceClientManager;
this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
this.localhostInternalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+ this.executor =
+ IoTDBThreadPoolFactory.newCachedThreadPool(LoadTsFileDispatcherImpl.class.getName());
}
public void setUuid(String uuid) {
@@ -79,21 +84,24 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
@Override
public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) {
- for (FragmentInstance instance : instances) {
- try (SetThreadName threadName =
- new SetThreadName(LoadTsFileScheduler.class.getName() + instance.getId().getFullId())) {
- dispatchOneInstance(instance);
- } catch (FragmentInstanceDispatchException e) {
- return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
- } catch (Throwable t) {
- logger.error("cannot dispatch FI for load operation", t);
- return immediateFuture(
- new FragInstanceDispatchResult(
- RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
- }
- }
- return immediateFuture(new FragInstanceDispatchResult(true));
+ return executor.submit(
+ () -> {
+ for (FragmentInstance instance : instances) {
+ try (SetThreadName threadName =
+ new SetThreadName(
+ LoadTsFileScheduler.class.getName() + instance.getId().getFullId())) {
+ dispatchOneInstance(instance);
+ } catch (FragmentInstanceDispatchException e) {
+ return new FragInstanceDispatchResult(e.getFailureStatus());
+ } catch (Throwable t) {
+ logger.error("cannot dispatch FI for load operation", t);
+ return new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()));
+ }
+ }
+ return new FragInstanceDispatchResult(true);
+ });
}
private void dispatchOneInstance(FragmentInstance instance)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index 2a795780ed..dd403d46ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -51,8 +51,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* {@link LoadTsFileScheduler} is used for scheduling {@link LoadSingleTsFileNode} and {@link
@@ -62,10 +65,12 @@ import java.util.concurrent.Future;
* href="https://apache-iotdb.feishu.cn/docx/doxcnyBYWzek8ksSEU6obZMpYLe">...</a>;
*/
public class LoadTsFileScheduler implements IScheduler {
+ public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 5184000L; // one day
+
private static final Logger logger = LoggerFactory.getLogger(LoadTsFileScheduler.class);
private final MPPQueryContext queryContext;
- private QueryStateMachine stateMachine;
+ private final QueryStateMachine stateMachine;
private LoadTsFileDispatcherImpl dispatcher;
private List<LoadSingleTsFileNode> tsFileNodeList;
private PlanFragmentId fragmentId;
@@ -144,7 +149,9 @@ public class LoadTsFileScheduler implements IScheduler {
dispatcher.dispatch(Collections.singletonList(instance));
try {
- FragInstanceDispatchResult result = dispatchResultFuture.get();
+ FragInstanceDispatchResult result =
+ dispatchResultFuture.get(
+ LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND, TimeUnit.SECONDS);
if (!result.isSuccessful()) {
// TODO: retry.
logger.error(
@@ -166,13 +173,20 @@ public class LoadTsFileScheduler implements IScheduler {
stateMachine.transitionToFailed(result.getFailureStatus()); // TODO: record more status
return false;
}
- } catch (InterruptedException | ExecutionException e) {
+ } catch (InterruptedException | ExecutionException | CancellationException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logger.warn("Interrupt or Execution error.", e);
stateMachine.transitionToFailed(e);
return false;
+ } catch (TimeoutException e) {
+ dispatchResultFuture.cancel(true);
+ logger.error(
+ String.format("Wait for loading %s time out.", LoadTsFilePieceNode.class.getName()),
+ e);
+ stateMachine.transitionToFailed(e);
+ return false;
}
}
}