You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/08/17 08:38:00 UTC

[GitHub] [iotdb] HeimingZ commented on a diff in pull request #6989: [To rel/0.13][IOTDB-3882] Cold Data Separation

HeimingZ commented on code in PR #6989:
URL: https://github.com/apache/iotdb/pull/6989#discussion_r947560488


##########
server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationLogWriter.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.migration;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.metadata.logfile.MLogTxtWriter;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/** MigrationLog writes the binary logs of MigrationTask into file using FileOutputStream */
+public class MigrationLogWriter implements AutoCloseable {
+  private static final Logger logger = LoggerFactory.getLogger(MLogTxtWriter.class);
+  private final File logFile;
+  private FileOutputStream logFileOutStream;
+
+  public MigrationLogWriter(String logFileName) throws FileNotFoundException {
+    logFile = SystemFileFactory.INSTANCE.getFile(logFileName);
+    if (!logFile.exists()) {
+      if (logFile.getParentFile() != null) {
+        if (logFile.getParentFile().mkdirs()) {
+          logger.info("created migrate log folder");
+        } else {
+          logger.info("create migrate log folder failed");
+        }
+      }
+    }
+    logFileOutStream = new FileOutputStream(logFile, true);
+  }
+
+  public MigrationLogWriter(File logFile) throws FileNotFoundException {
+    this.logFile = logFile;
+    logFileOutStream = new FileOutputStream(logFile, true);
+  }

Review Comment:
   Why not check parent directory existence in another constructor? It's better to move check into MigrationLogWriter(File logFile) and let MigrationLogWriter(String logFileName) call this(new File(logFileName)).



##########
server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationManager.java:
##########
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.migration;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.migration.MigrationLogWriter.MigrationLog;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * MigrationManager keep tracks of all Migration Tasks, creates the threads to check/run the
+ * MigrationTask.
+ */
+public class MigrationManager {
+  private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
+  protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private MigrationLogWriter logWriter;
+
+  // taskId -> MigrationTask
+  private ConcurrentHashMap<Long, MigrationTask> migrationTasks = new ConcurrentHashMap<>();
+  // the current largest MigrationTask id + 1, used to create new tasks
+  private long currentTaskId = 0;
+  // single thread to iterate through migrationTasks and check start
+  private ScheduledExecutorService migrationCheckThread;
+  // multiple threads to run the tasks
+  private ExecutorService migrationTaskThreadPool;

Review Comment:
   Shutdown these threads when the system exits.



##########
server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationLogWriter.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.migration;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.metadata.logfile.MLogTxtWriter;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/** MigrationLog writes the binary logs of MigrationTask into file using FileOutputStream */
+public class MigrationLogWriter implements AutoCloseable {
+  private static final Logger logger = LoggerFactory.getLogger(MLogTxtWriter.class);
+  private final File logFile;
+  private FileOutputStream logFileOutStream;
+
+  public MigrationLogWriter(String logFileName) throws FileNotFoundException {
+    logFile = SystemFileFactory.INSTANCE.getFile(logFileName);
+    if (!logFile.exists()) {
+      if (logFile.getParentFile() != null) {
+        if (logFile.getParentFile().mkdirs()) {
+          logger.info("created migrate log folder");
+        } else {
+          logger.info("create migrate log folder failed");
+        }
+      }
+    }
+    logFileOutStream = new FileOutputStream(logFile, true);
+  }
+
+  public MigrationLogWriter(File logFile) throws FileNotFoundException {
+    this.logFile = logFile;
+    logFileOutStream = new FileOutputStream(logFile, true);
+  }
+
+  private void putLog(MigrationLog log) {
+    try {
+      int type = log.type.ordinal();
+      ReadWriteIOUtils.write((byte) type, logFileOutStream);
+      ReadWriteIOUtils.write(log.taskId, logFileOutStream);
+
+      if (log.type == MigrationLog.LogType.SET) {
+        ReadWriteIOUtils.write(log.storageGroup.getFullPath(), logFileOutStream);
+        ReadWriteIOUtils.write(log.targetDirPath, logFileOutStream);
+        ReadWriteIOUtils.write(log.startTime, logFileOutStream);
+        ReadWriteIOUtils.write(log.ttl, logFileOutStream);
+      }
+    } catch (IOException e) {
+      logger.error("unable to write to migrate log");
+    }
+  }

Review Comment:
   Please flush after put log, logs may lost because they are only in the os buffer.



##########
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java:
##########
@@ -1555,6 +1556,72 @@ public void timedFlushSeqMemTable() {
     }
   }
 
+  /** iterate over TsFiles and migrate to targetDir if out of ttl */
+  public void checkMigration(MigrationTask task) {

Review Comment:
   Is it possible to move checkMigration and checkMigrateFile to MigrationTask?



##########
server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java:
##########
@@ -1254,6 +1257,46 @@ protected void getSeriesSchemas(InsertPlan insertPlan, VirtualStorageGroupProces
     }
   }
 
+  // push the migration info to migrationManager
+  public void setMigration(PartialPath storageGroup, File targetDir, long ttl, long startTime) {
+    migrationManager.setMigrate(storageGroup, targetDir, ttl, startTime);
+    logger.info("start check migration task successfully.");
+  }
+
+  public void unsetMigration(long taskId, PartialPath storageGroup) {
+    if (taskId != -1) {

Review Comment:
   Are negative values less than -1 legal?Maybe taskId >= 0 is better.



##########
server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java:
##########
@@ -851,6 +851,9 @@ public class IoTDBConfig {
   // The max record num returned in one schema query.
   private int schemaQueryFetchSize = 10000000;
 
+  /** number of threads given to migration tasks */
+  private int migrationThread = 1;
+

Review Comment:
   Please add parameter migration_thread_num in the iotdb-datanode.properties and load it in the IoTDBDescriptor.



##########
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java:
##########
@@ -535,6 +535,33 @@ public boolean removeResourceFile() {
     return true;
   }
 
+  /**
+   * Move its data file, resource file, and modification file physically.
+   *
+   * @return migrated data file
+   */
+  public File migrate(File targetDir) {
+    // get the resource and mod files
+    File resourceFile = fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX);
+    File modFile = fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX);
+
+    // get the target file locations
+    File migratedFile = fsFactory.getFile(targetDir, file.getName());
+    File migratedResourceFile = fsFactory.getFile(targetDir, file.getName() + RESOURCE_SUFFIX);
+    File migratedModificationFile =
+        fsFactory.getFile(targetDir, file.getName() + ModificationFile.FILE_SUFFIX);
+
+    // move
+    fsFactory.moveFile(file, migratedFile);
+    if (resourceFile.exists()) {
+      fsFactory.moveFile(resourceFile, migratedResourceFile);
+    }
+    if (modFile.exists()) {
+      fsFactory.moveFile(modFile, migratedModificationFile);
+    }
+    return migratedFile;
+  }

Review Comment:
   This method isn't atomic. If the system crashes or exception occurs when this method is running, some files may still in the origin data directory, not in the target directory. How to recover our system from this scenario? Please think of a solution to this problem.



##########
server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.migration;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+
+import java.io.File;
+
+/** Data class for each Migration Task */
+public class MigrationTask {
+  private long taskId;
+  private PartialPath storageGroup;
+  private File targetDir;
+  private long startTime;
+  private long ttl;
+  private MigrationTaskStatus status = MigrationTaskStatus.READY;

Review Comment:
   Please add volatile keyword.



##########
server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java:
##########
@@ -408,6 +420,12 @@ public boolean processNonQuery(PhysicalPlan plan)
         return true;
       case SHOW_QUERY_RESOURCE:
         return processShowQueryResource();
+      case MIGRATION:
+        operateMigration((SetMigrationPlan) plan);

Review Comment:
   Please rename MIGRATION to SET_MIGRATION and operateMigration to operateSetMigration.



##########
server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java:
##########
@@ -760,6 +778,8 @@ protected QueryDataSet processShowQuery(ShowPlan showPlan, QueryContext context)
         return processShowPathsSetSchemaTemplate((ShowPathsSetTemplatePlan) showPlan);
       case PATHS_USING_SCHEMA_TEMPLATE:
         return processShowPathsUsingSchemaTemplate((ShowPathsUsingTemplatePlan) showPlan);
+      case MIGRATION:

Review Comment:
   Please rename MIGRATION to SHOW_MIGRATION.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org