You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/10/24 01:05:03 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-4675] Use createMultiTimeseries to optimize MLogLoader (#7673)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 4f5ed3a4e5 [To rel/0.13][IOTDB-4675] Use createMultiTimeseries to optimize MLogLoader (#7673)
4f5ed3a4e5 is described below

commit 4f5ed3a4e5e0dd3aef37ba9f422b5fbed1d09ab6
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Mon Oct 24 09:04:57 2022 +0800

    [To rel/0.13][IOTDB-4675] Use createMultiTimeseries to optimize MLogLoader (#7673)
---
 .../org/apache/iotdb/db/metadata/mtree/MTree.java  |   7 +-
 .../org/apache/iotdb/db/tools/mlog/MLogLoader.java | 229 ++++++++++++++++-----
 .../org/apache/iotdb/db/tools/MLogLoaderTest.java  |   6 +
 3 files changed, 184 insertions(+), 58 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
index 6cdbac93b4..8ec99e583b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
@@ -83,6 +83,7 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -364,7 +365,7 @@ public class MTree implements Serializable {
         throw new PathAlreadyExistException(path.getFullPath());
       }
 
-      if (alias != null && cur.hasChild(alias)) {
+      if (!StringUtils.isEmpty(alias) && cur.hasChild(alias)) {
         throw new AliasAlreadyExistException(path.getFullPath(), alias);
       }
 
@@ -387,10 +388,10 @@ public class MTree implements Serializable {
               entityMNode,
               leafName,
               new MeasurementSchema(leafName, dataType, encoding, compressor, props),
-              alias);
+              StringUtils.isEmpty(alias) ? null : alias);
       entityMNode.addChild(leafName, measurementMNode);
       // link alias to LeafMNode
-      if (alias != null) {
+      if (!StringUtils.isEmpty(alias)) {
         entityMNode.addAlias(alias, measurementMNode);
       }
       return measurementMNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogLoader.java b/server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogLoader.java
index 448092f0e4..9c73c3eaed 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogLoader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.tools.mlog;
 
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.metadata.logfile.MLogReader;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.tag.TagManager;
@@ -38,12 +39,15 @@ import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 import org.apache.iotdb.db.utils.CommandLineUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.util.Version;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 import org.apache.commons.cli.CommandLine;
@@ -63,6 +67,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -93,13 +99,79 @@ public class MLogLoader {
 
   private static final String HELP_ARGS = "help";
 
-  private static String mLogFile;
-  private static String tLogFile;
-  private static String host;
-  private static int port;
-  private static String user;
-  private static String password;
-  private static TagManager tagManager;
+  private Session session;
+  private String mLogFile;
+  private String tLogFile;
+  private String host;
+  private int port;
+  private String user;
+  private String password;
+  private TagManager tagManager;
+
+  // buffer
+  private static final int BATCH_NUM_THRESHOLD = 1000;
+  private static final int BATCH_MEM_THRESHOLD = 10 * 1024 * 1024;
+
+  private int batchNum;
+  private long batchMem;
+  private List<String> paths;
+  private List<TSDataType> dataTypes;
+  private List<TSEncoding> encodings;
+  private List<CompressionType> compressors;
+  private List<String> alias;
+  private List<Map<String, String>> props;
+  private List<Map<String, String>> tags;
+  private List<Map<String, String>> attributes;
+
+  // statistics
+  private long successCnt = 0;
+  private long skipCnt = 0;
+  private long failedCnt = 0;
+  // scheduled print log
+  private final ScheduledExecutorService scheduledExecutorService;
+
+  private MLogLoader(CommandLine commandLine) throws ParseException {
+    initBuffer();
+    mLogFile = CommandLineUtils.checkRequiredArg(MLOG_FILE_ARGS, MLOG_FILE_NAME, commandLine);
+    host = commandLine.getOptionValue(HOST_ARGS);
+    if (host == null) {
+      host = "127.0.0.1";
+    }
+    tLogFile = commandLine.getOptionValue(TLOG_FILE_ARGS);
+    if (tLogFile == null) {
+      logger.warn("No specify tlog.txt file to parse, tag and attributes will be ignored.");
+    }
+    String portTmp = commandLine.getOptionValue(PORT_ARGS);
+    port = portTmp == null ? 6667 : Integer.parseInt(portTmp);
+    user = commandLine.getOptionValue(USER_ARGS);
+    if (user == null) {
+      user = "root";
+    }
+    password = commandLine.getOptionValue(PASSWORD_ARGS);
+    if (password == null) {
+      password = "root";
+    }
+    session =
+        new Session.Builder()
+            .host(host)
+            .port(port)
+            .username(user)
+            .password(password)
+            .version(Version.V_0_13)
+            .build();
+    scheduledExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("MLogLogger");
+    scheduledExecutorService.scheduleWithFixedDelay(
+        () ->
+            logger.info(
+                "MLog successfully loaded {} entries, failed {} entries, and skipped {} entries",
+                successCnt,
+                failedCnt,
+                skipCnt),
+        1,
+        5,
+        TimeUnit.SECONDS);
+  }
 
   /**
    * create the commandline options.
@@ -198,69 +270,31 @@ public class MLogLoader {
       return;
     }
     try {
-      parseBasicParams(commandLine);
-      parseFileAndLoad();
+      MLogLoader mLogLoader = new MLogLoader(commandLine);
+      mLogLoader.parseFileAndLoad();
     } catch (Exception e) {
       logger.error("Encounter an error, because: {} ", e.getMessage());
     }
   }
 
-  public static void parseBasicParams(CommandLine commandLine) throws ParseException, IOException {
-    mLogFile = CommandLineUtils.checkRequiredArg(MLOG_FILE_ARGS, MLOG_FILE_NAME, commandLine);
-    host = commandLine.getOptionValue(HOST_ARGS);
-    if (host == null) {
-      host = "127.0.0.1";
-    }
-    tLogFile = commandLine.getOptionValue(TLOG_FILE_ARGS);
-    String portTmp = commandLine.getOptionValue(PORT_ARGS);
-    port = portTmp == null ? 6667 : Integer.parseInt(portTmp);
-    user = commandLine.getOptionValue(USER_ARGS);
-    if (user == null) {
-      user = "root";
-    }
-    password = commandLine.getOptionValue(PASSWORD_ARGS);
-    if (password == null) {
-      password = "root";
-    }
-  }
-
-  public static void parseFileAndLoad() throws Exception {
-    Session session =
-        new Session.Builder()
-            .host(host)
-            .port(port)
-            .username(user)
-            .password(password)
-            .version(Version.V_0_13)
-            .build();
+  public void parseFileAndLoad() throws Exception {
     try (MLogReader mLogReader = new MLogReader(mLogFile)) {
       session.open(false);
       while (mLogReader.hasNext()) {
         PhysicalPlan plan = mLogReader.next();
-        logger.info("Start load plan {}", plan);
         try {
           switch (plan.getOperatorType()) {
             case CREATE_TIMESERIES:
               CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
               if (createTimeSeriesPlan.getTagOffset() != -1) {
                 if (tLogFile == null) {
-                  logger.warn(
-                      "No specify tlog.txt file to parse, tag and attributes will be ignored.");
                   createTimeSeriesPlan.setTags(Collections.emptyMap());
                   createTimeSeriesPlan.setAttributes(Collections.emptyMap());
                 } else {
                   fillTagsAndOffset(createTimeSeriesPlan);
                 }
               }
-              session.createTimeseries(
-                  createTimeSeriesPlan.getPath().getFullPath(),
-                  createTimeSeriesPlan.getDataType(),
-                  createTimeSeriesPlan.getEncoding(),
-                  createTimeSeriesPlan.getCompressor(),
-                  createTimeSeriesPlan.getProps(),
-                  createTimeSeriesPlan.getTags(),
-                  createTimeSeriesPlan.getAttributes(),
-                  createTimeSeriesPlan.getAlias());
+              addBatchAndCheck(createTimeSeriesPlan);
               break;
             case CREATE_ALIGNED_TIMESERIES:
               CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
@@ -272,21 +306,27 @@ public class MLogLoader {
                   createAlignedTimeSeriesPlan.getEncodings(),
                   createAlignedTimeSeriesPlan.getCompressors(),
                   createAlignedTimeSeriesPlan.getAliasList());
+              successCnt++;
               break;
             case DELETE_TIMESERIES:
+              flushBuffer();
               session.deleteTimeseries(
                   plan.getPaths().stream()
                       .map(PartialPath::getFullPath)
                       .collect(Collectors.toList()));
+              successCnt++;
               break;
             case SET_STORAGE_GROUP:
               session.setStorageGroup(((SetStorageGroupPlan) plan).getPath().getFullPath());
+              successCnt++;
               break;
             case DELETE_STORAGE_GROUP:
+              flushBuffer();
               session.deleteStorageGroups(
                   plan.getPaths().stream()
                       .map(PartialPath::getFullPath)
                       .collect(Collectors.toList()));
+              successCnt++;
               break;
             case TTL:
               SetTTLPlan setTTLPlan = (SetTTLPlan) plan;
@@ -298,19 +338,24 @@ public class MLogLoader {
                     String.format(
                         "set ttl to %s %d", setTTLPlan.getStorageGroup(), setTTLPlan.getDataTTL()));
               }
+              successCnt++;
               break;
             case CHANGE_ALIAS:
+              flushBuffer();
               ChangeAliasPlan changeAliasPlan = (ChangeAliasPlan) plan;
               session.executeNonQueryStatement(
                   String.format(
                       "ALTER timeseries %s UPSERT ALIAS=%s",
                       changeAliasPlan.getPath(), changeAliasPlan.getAlias()));
+              successCnt++;
               break;
             case CHANGE_TAG_OFFSET:
+              flushBuffer();
               if (tLogFile == null) {
-                logger.warn("No specify tlog.txt file to parse, skip ChangeTagOffsetPlan.");
+                skipCnt++;
               } else {
                 session.executeNonQueryStatement(genAlterTimeSeriesSQL((ChangeTagOffsetPlan) plan));
+                successCnt++;
               }
               break;
             case CREATE_TEMPLATE:
@@ -333,6 +378,7 @@ public class MLogLoader {
                   encodings,
                   compressors,
                   template.isDirectAligned());
+              successCnt++;
               break;
             case APPEND_TEMPLATE:
               AppendTemplatePlan appendTemplatePlan = (AppendTemplatePlan) plan;
@@ -351,43 +397,59 @@ public class MLogLoader {
                     appendTemplatePlan.getEncodings(),
                     appendTemplatePlan.getCompressors());
               }
+              successCnt++;
               break;
             case PRUNE_TEMPLATE:
               PruneTemplatePlan pruneTemplatePlan = (PruneTemplatePlan) plan;
               session.deleteNodeInTemplate(
                   pruneTemplatePlan.getName(), pruneTemplatePlan.getPrunedMeasurements().get(0));
+              successCnt++;
               break;
             case SET_TEMPLATE:
+              flushBuffer();
               SetTemplatePlan setTemplatePlan = (SetTemplatePlan) plan;
               session.setSchemaTemplate(
                   setTemplatePlan.getTemplateName(), setTemplatePlan.getPrefixPath());
+              successCnt++;
               break;
             case UNSET_TEMPLATE:
               UnsetTemplatePlan unsetTemplatePlan = (UnsetTemplatePlan) plan;
               session.unsetSchemaTemplate(
                   unsetTemplatePlan.getPrefixPath(), unsetTemplatePlan.getTemplateName());
+              successCnt++;
               break;
             case DROP_TEMPLATE:
               session.dropSchemaTemplate(((DropTemplatePlan) plan).getName());
+              successCnt++;
               break;
             case ACTIVATE_TEMPLATE:
               session.createTimeseriesOfTemplateOnPath(
                   ((ActivateTemplatePlan) plan).getPrefixPath().getFullPath());
+              successCnt++;
               break;
             case DEACTIVATE_TEMPLATE:
               DeactivateTemplatePlan deactivateTemplatePlan = (DeactivateTemplatePlan) plan;
               session.deactivateTemplateOn(
                   deactivateTemplatePlan.getTemplateName(),
                   deactivateTemplatePlan.getPrefixPath().getFullPath());
+              successCnt++;
               break;
             default:
-              logger.warn("Skip load plan {}", plan);
+              // ignored unrecognizable command
           }
         } catch (Exception e) {
+          failedCnt++;
           logger.error("Fail to load plan {} because {}", plan, e.getMessage());
         }
       }
+      flushBuffer();
     } finally {
+      scheduledExecutorService.shutdown();
+      logger.info(
+          "MLog loading complete.{} entries loaded successfully, {} entries failed, and {} entries skipped.",
+          successCnt,
+          failedCnt,
+          skipCnt);
       if (tagManager != null) {
         tagManager.clear();
       }
@@ -395,8 +457,7 @@ public class MLogLoader {
     }
   }
 
-  private static void fillTagsAndOffset(CreateTimeSeriesPlan createTimeSeriesPlan)
-      throws IOException {
+  private void fillTagsAndOffset(CreateTimeSeriesPlan createTimeSeriesPlan) throws IOException {
     if (tagManager == null) {
       tagManager = new TagManager();
       File file = new File(tLogFile);
@@ -408,8 +469,7 @@ public class MLogLoader {
     createTimeSeriesPlan.setAttributes(tagAndAttributePair.right);
   }
 
-  private static String genAlterTimeSeriesSQL(ChangeTagOffsetPlan changeTagOffsetPlan)
-      throws IOException {
+  private String genAlterTimeSeriesSQL(ChangeTagOffsetPlan changeTagOffsetPlan) throws IOException {
     if (tagManager == null) {
       tagManager = new TagManager();
       File file = new File(tLogFile);
@@ -442,4 +502,63 @@ public class MLogLoader {
     }
     return stringBuilder.toString();
   }
+
+  private void initBuffer() {
+    paths = new ArrayList<>();
+    dataTypes = new ArrayList<>();
+    encodings = new ArrayList<>();
+    compressors = new ArrayList<>();
+    alias = new ArrayList<>();
+    props = new ArrayList<>();
+    tags = new ArrayList<>();
+    attributes = new ArrayList<>();
+    batchNum = 0;
+    batchMem =
+        RamUsageEstimator.sizeOf(paths)
+            + RamUsageEstimator.sizeOf(dataTypes)
+            + RamUsageEstimator.sizeOf(encodings)
+            + RamUsageEstimator.sizeOf(compressors)
+            + RamUsageEstimator.sizeOf(alias)
+            + RamUsageEstimator.sizeOf(props)
+            + RamUsageEstimator.sizeOf(tags)
+            + RamUsageEstimator.sizeOf(attributes);
+  }
+
+  private void addBatchAndCheck(CreateTimeSeriesPlan plan)
+      throws IoTDBConnectionException, StatementExecutionException {
+    paths.add(plan.getPath().getFullPath());
+    dataTypes.add(plan.getDataType());
+    encodings.add(plan.getEncoding());
+    compressors.add(plan.getCompressor());
+    props.add(plan.getProps() == null ? Collections.emptyMap() : plan.getProps());
+    tags.add(plan.getTags() == null ? Collections.emptyMap() : plan.getTags());
+    attributes.add(plan.getAttributes() == null ? Collections.emptyMap() : plan.getAttributes());
+    alias.add(plan.getAlias() == null ? "" : plan.getAlias());
+    batchNum += 1;
+    batchMem +=
+        (RamUsageEstimator.sizeOf(plan.getPath().getFullPath())
+            + RamUsageEstimator.sizeOf(plan.getDataType())
+            + RamUsageEstimator.sizeOf(plan.getEncoding())
+            + RamUsageEstimator.sizeOf(plan.getCompressor())
+            + RamUsageEstimator.sizeOf(
+                plan.getProps() == null ? Collections.emptyMap() : plan.getProps())
+            + RamUsageEstimator.sizeOf(
+                plan.getTags() == null ? Collections.emptyMap() : plan.getTags())
+            + RamUsageEstimator.sizeOf(
+                plan.getAttributes() == null ? Collections.emptyMap() : plan.getAttributes())
+            + RamUsageEstimator.sizeOf(plan.getAlias() == null ? "" : plan.getAlias()));
+    if (batchNum >= BATCH_NUM_THRESHOLD || batchMem >= BATCH_MEM_THRESHOLD) {
+      flushBuffer();
+    }
+  }
+
+  private void flushBuffer() throws IoTDBConnectionException, StatementExecutionException {
+    if (batchNum > 0) {
+      logger.info("Flush buffer and CreateMultiTimeseries.");
+      session.createMultiTimeseries(
+          paths, dataTypes, encodings, compressors, props, tags, attributes, alias);
+      successCnt += batchNum;
+    }
+    initBuffer();
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/tools/MLogLoaderTest.java b/server/src/test/java/org/apache/iotdb/db/tools/MLogLoaderTest.java
index 7716e8e802..0860c433b0 100644
--- a/server/src/test/java/org/apache/iotdb/db/tools/MLogLoaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/tools/MLogLoaderTest.java
@@ -107,6 +107,12 @@ public class MLogLoaderTest {
     createTimeSeriesPlan3.setEncoding(TSEncoding.PLAIN);
     createTimeSeriesPlan3.setCompressor(CompressionType.GZIP);
     IoTDB.metaManager.createTimeseries(createTimeSeriesPlan3);
+    CreateTimeSeriesPlan createTimeSeriesPlan4 = new CreateTimeSeriesPlan();
+    createTimeSeriesPlan4.setPath(new PartialPath("root.sg1.device1.s4"));
+    createTimeSeriesPlan4.setDataType(TSDataType.DOUBLE);
+    createTimeSeriesPlan4.setEncoding(TSEncoding.PLAIN);
+    createTimeSeriesPlan4.setCompressor(CompressionType.GZIP);
+    IoTDB.metaManager.createTimeseries(createTimeSeriesPlan4);
     IoTDB.metaManager.createAlignedTimeSeries(
         new PartialPath("root.laptop.d1.aligned_device"),
         Arrays.asList("s3", "s4", "s5"),