You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/10/24 01:05:03 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-4675] Use createMultiTimeseries to optimize MLogLoader (#7673)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 4f5ed3a4e5 [To rel/0.13][IOTDB-4675] Use createMultiTimeseries to optimize MLogLoader (#7673)
4f5ed3a4e5 is described below
commit 4f5ed3a4e5e0dd3aef37ba9f422b5fbed1d09ab6
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Mon Oct 24 09:04:57 2022 +0800
[To rel/0.13][IOTDB-4675] Use createMultiTimeseries to optimize MLogLoader (#7673)
---
.../org/apache/iotdb/db/metadata/mtree/MTree.java | 7 +-
.../org/apache/iotdb/db/tools/mlog/MLogLoader.java | 229 ++++++++++++++++-----
.../org/apache/iotdb/db/tools/MLogLoaderTest.java | 6 +
3 files changed, 184 insertions(+), 58 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
index 6cdbac93b4..8ec99e583b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
@@ -83,6 +83,7 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -364,7 +365,7 @@ public class MTree implements Serializable {
throw new PathAlreadyExistException(path.getFullPath());
}
- if (alias != null && cur.hasChild(alias)) {
+ if (!StringUtils.isEmpty(alias) && cur.hasChild(alias)) {
throw new AliasAlreadyExistException(path.getFullPath(), alias);
}
@@ -387,10 +388,10 @@ public class MTree implements Serializable {
entityMNode,
leafName,
new MeasurementSchema(leafName, dataType, encoding, compressor, props),
- alias);
+ StringUtils.isEmpty(alias) ? null : alias);
entityMNode.addChild(leafName, measurementMNode);
// link alias to LeafMNode
- if (alias != null) {
+ if (!StringUtils.isEmpty(alias)) {
entityMNode.addAlias(alias, measurementMNode);
}
return measurementMNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogLoader.java b/server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogLoader.java
index 448092f0e4..9c73c3eaed 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogLoader.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.tools.mlog;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.metadata.logfile.MLogReader;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.tag.TagManager;
@@ -38,12 +39,15 @@ import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
import org.apache.iotdb.db.utils.CommandLineUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.util.Version;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.commons.cli.CommandLine;
@@ -63,6 +67,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@@ -93,13 +99,79 @@ public class MLogLoader {
private static final String HELP_ARGS = "help";
- private static String mLogFile;
- private static String tLogFile;
- private static String host;
- private static int port;
- private static String user;
- private static String password;
- private static TagManager tagManager;
+ private Session session;
+ private String mLogFile;
+ private String tLogFile;
+ private String host;
+ private int port;
+ private String user;
+ private String password;
+ private TagManager tagManager;
+
+ // buffer
+ private static final int BATCH_NUM_THRESHOLD = 1000;
+ private static final int BATCH_MEM_THRESHOLD = 10 * 1024 * 1024;
+
+ private int batchNum;
+ private long batchMem;
+ private List<String> paths;
+ private List<TSDataType> dataTypes;
+ private List<TSEncoding> encodings;
+ private List<CompressionType> compressors;
+ private List<String> alias;
+ private List<Map<String, String>> props;
+ private List<Map<String, String>> tags;
+ private List<Map<String, String>> attributes;
+
+ // statistics
+ private long successCnt = 0;
+ private long skipCnt = 0;
+ private long failedCnt = 0;
+ // scheduled print log
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ private MLogLoader(CommandLine commandLine) throws ParseException {
+ initBuffer();
+ mLogFile = CommandLineUtils.checkRequiredArg(MLOG_FILE_ARGS, MLOG_FILE_NAME, commandLine);
+ host = commandLine.getOptionValue(HOST_ARGS);
+ if (host == null) {
+ host = "127.0.0.1";
+ }
+ tLogFile = commandLine.getOptionValue(TLOG_FILE_ARGS);
+ if (tLogFile == null) {
+ logger.warn("No specify tlog.txt file to parse, tag and attributes will be ignored.");
+ }
+ String portTmp = commandLine.getOptionValue(PORT_ARGS);
+ port = portTmp == null ? 6667 : Integer.parseInt(portTmp);
+ user = commandLine.getOptionValue(USER_ARGS);
+ if (user == null) {
+ user = "root";
+ }
+ password = commandLine.getOptionValue(PASSWORD_ARGS);
+ if (password == null) {
+ password = "root";
+ }
+ session =
+ new Session.Builder()
+ .host(host)
+ .port(port)
+ .username(user)
+ .password(password)
+ .version(Version.V_0_13)
+ .build();
+ scheduledExecutorService =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("MLogLogger");
+ scheduledExecutorService.scheduleWithFixedDelay(
+ () ->
+ logger.info(
+ "MLog successfully loaded {} entries, failed {} entries, and skipped {} entries",
+ successCnt,
+ failedCnt,
+ skipCnt),
+ 1,
+ 5,
+ TimeUnit.SECONDS);
+ }
/**
* create the commandline options.
@@ -198,69 +270,31 @@ public class MLogLoader {
return;
}
try {
- parseBasicParams(commandLine);
- parseFileAndLoad();
+ MLogLoader mLogLoader = new MLogLoader(commandLine);
+ mLogLoader.parseFileAndLoad();
} catch (Exception e) {
logger.error("Encounter an error, because: {} ", e.getMessage());
}
}
- public static void parseBasicParams(CommandLine commandLine) throws ParseException, IOException {
- mLogFile = CommandLineUtils.checkRequiredArg(MLOG_FILE_ARGS, MLOG_FILE_NAME, commandLine);
- host = commandLine.getOptionValue(HOST_ARGS);
- if (host == null) {
- host = "127.0.0.1";
- }
- tLogFile = commandLine.getOptionValue(TLOG_FILE_ARGS);
- String portTmp = commandLine.getOptionValue(PORT_ARGS);
- port = portTmp == null ? 6667 : Integer.parseInt(portTmp);
- user = commandLine.getOptionValue(USER_ARGS);
- if (user == null) {
- user = "root";
- }
- password = commandLine.getOptionValue(PASSWORD_ARGS);
- if (password == null) {
- password = "root";
- }
- }
-
- public static void parseFileAndLoad() throws Exception {
- Session session =
- new Session.Builder()
- .host(host)
- .port(port)
- .username(user)
- .password(password)
- .version(Version.V_0_13)
- .build();
+ public void parseFileAndLoad() throws Exception {
try (MLogReader mLogReader = new MLogReader(mLogFile)) {
session.open(false);
while (mLogReader.hasNext()) {
PhysicalPlan plan = mLogReader.next();
- logger.info("Start load plan {}", plan);
try {
switch (plan.getOperatorType()) {
case CREATE_TIMESERIES:
CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
if (createTimeSeriesPlan.getTagOffset() != -1) {
if (tLogFile == null) {
- logger.warn(
- "No specify tlog.txt file to parse, tag and attributes will be ignored.");
createTimeSeriesPlan.setTags(Collections.emptyMap());
createTimeSeriesPlan.setAttributes(Collections.emptyMap());
} else {
fillTagsAndOffset(createTimeSeriesPlan);
}
}
- session.createTimeseries(
- createTimeSeriesPlan.getPath().getFullPath(),
- createTimeSeriesPlan.getDataType(),
- createTimeSeriesPlan.getEncoding(),
- createTimeSeriesPlan.getCompressor(),
- createTimeSeriesPlan.getProps(),
- createTimeSeriesPlan.getTags(),
- createTimeSeriesPlan.getAttributes(),
- createTimeSeriesPlan.getAlias());
+ addBatchAndCheck(createTimeSeriesPlan);
break;
case CREATE_ALIGNED_TIMESERIES:
CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
@@ -272,21 +306,27 @@ public class MLogLoader {
createAlignedTimeSeriesPlan.getEncodings(),
createAlignedTimeSeriesPlan.getCompressors(),
createAlignedTimeSeriesPlan.getAliasList());
+ successCnt++;
break;
case DELETE_TIMESERIES:
+ flushBuffer();
session.deleteTimeseries(
plan.getPaths().stream()
.map(PartialPath::getFullPath)
.collect(Collectors.toList()));
+ successCnt++;
break;
case SET_STORAGE_GROUP:
session.setStorageGroup(((SetStorageGroupPlan) plan).getPath().getFullPath());
+ successCnt++;
break;
case DELETE_STORAGE_GROUP:
+ flushBuffer();
session.deleteStorageGroups(
plan.getPaths().stream()
.map(PartialPath::getFullPath)
.collect(Collectors.toList()));
+ successCnt++;
break;
case TTL:
SetTTLPlan setTTLPlan = (SetTTLPlan) plan;
@@ -298,19 +338,24 @@ public class MLogLoader {
String.format(
"set ttl to %s %d", setTTLPlan.getStorageGroup(), setTTLPlan.getDataTTL()));
}
+ successCnt++;
break;
case CHANGE_ALIAS:
+ flushBuffer();
ChangeAliasPlan changeAliasPlan = (ChangeAliasPlan) plan;
session.executeNonQueryStatement(
String.format(
"ALTER timeseries %s UPSERT ALIAS=%s",
changeAliasPlan.getPath(), changeAliasPlan.getAlias()));
+ successCnt++;
break;
case CHANGE_TAG_OFFSET:
+ flushBuffer();
if (tLogFile == null) {
- logger.warn("No specify tlog.txt file to parse, skip ChangeTagOffsetPlan.");
+ skipCnt++;
} else {
session.executeNonQueryStatement(genAlterTimeSeriesSQL((ChangeTagOffsetPlan) plan));
+ successCnt++;
}
break;
case CREATE_TEMPLATE:
@@ -333,6 +378,7 @@ public class MLogLoader {
encodings,
compressors,
template.isDirectAligned());
+ successCnt++;
break;
case APPEND_TEMPLATE:
AppendTemplatePlan appendTemplatePlan = (AppendTemplatePlan) plan;
@@ -351,43 +397,59 @@ public class MLogLoader {
appendTemplatePlan.getEncodings(),
appendTemplatePlan.getCompressors());
}
+ successCnt++;
break;
case PRUNE_TEMPLATE:
PruneTemplatePlan pruneTemplatePlan = (PruneTemplatePlan) plan;
session.deleteNodeInTemplate(
pruneTemplatePlan.getName(), pruneTemplatePlan.getPrunedMeasurements().get(0));
+ successCnt++;
break;
case SET_TEMPLATE:
+ flushBuffer();
SetTemplatePlan setTemplatePlan = (SetTemplatePlan) plan;
session.setSchemaTemplate(
setTemplatePlan.getTemplateName(), setTemplatePlan.getPrefixPath());
+ successCnt++;
break;
case UNSET_TEMPLATE:
UnsetTemplatePlan unsetTemplatePlan = (UnsetTemplatePlan) plan;
session.unsetSchemaTemplate(
unsetTemplatePlan.getPrefixPath(), unsetTemplatePlan.getTemplateName());
+ successCnt++;
break;
case DROP_TEMPLATE:
session.dropSchemaTemplate(((DropTemplatePlan) plan).getName());
+ successCnt++;
break;
case ACTIVATE_TEMPLATE:
session.createTimeseriesOfTemplateOnPath(
((ActivateTemplatePlan) plan).getPrefixPath().getFullPath());
+ successCnt++;
break;
case DEACTIVATE_TEMPLATE:
DeactivateTemplatePlan deactivateTemplatePlan = (DeactivateTemplatePlan) plan;
session.deactivateTemplateOn(
deactivateTemplatePlan.getTemplateName(),
deactivateTemplatePlan.getPrefixPath().getFullPath());
+ successCnt++;
break;
default:
- logger.warn("Skip load plan {}", plan);
+ // ignored unrecognizable command
}
} catch (Exception e) {
+ failedCnt++;
logger.error("Fail to load plan {} because {}", plan, e.getMessage());
}
}
+ flushBuffer();
} finally {
+ scheduledExecutorService.shutdown();
+ logger.info(
+ "MLog loading complete.{} entries loaded successfully, {} entries failed, and {} entries skipped.",
+ successCnt,
+ failedCnt,
+ skipCnt);
if (tagManager != null) {
tagManager.clear();
}
@@ -395,8 +457,7 @@ public class MLogLoader {
}
}
- private static void fillTagsAndOffset(CreateTimeSeriesPlan createTimeSeriesPlan)
- throws IOException {
+ private void fillTagsAndOffset(CreateTimeSeriesPlan createTimeSeriesPlan) throws IOException {
if (tagManager == null) {
tagManager = new TagManager();
File file = new File(tLogFile);
@@ -408,8 +469,7 @@ public class MLogLoader {
createTimeSeriesPlan.setAttributes(tagAndAttributePair.right);
}
- private static String genAlterTimeSeriesSQL(ChangeTagOffsetPlan changeTagOffsetPlan)
- throws IOException {
+ private String genAlterTimeSeriesSQL(ChangeTagOffsetPlan changeTagOffsetPlan) throws IOException {
if (tagManager == null) {
tagManager = new TagManager();
File file = new File(tLogFile);
@@ -442,4 +502,63 @@ public class MLogLoader {
}
return stringBuilder.toString();
}
+
+ private void initBuffer() {
+ paths = new ArrayList<>();
+ dataTypes = new ArrayList<>();
+ encodings = new ArrayList<>();
+ compressors = new ArrayList<>();
+ alias = new ArrayList<>();
+ props = new ArrayList<>();
+ tags = new ArrayList<>();
+ attributes = new ArrayList<>();
+ batchNum = 0;
+ batchMem =
+ RamUsageEstimator.sizeOf(paths)
+ + RamUsageEstimator.sizeOf(dataTypes)
+ + RamUsageEstimator.sizeOf(encodings)
+ + RamUsageEstimator.sizeOf(compressors)
+ + RamUsageEstimator.sizeOf(alias)
+ + RamUsageEstimator.sizeOf(props)
+ + RamUsageEstimator.sizeOf(tags)
+ + RamUsageEstimator.sizeOf(attributes);
+ }
+
+ private void addBatchAndCheck(CreateTimeSeriesPlan plan)
+ throws IoTDBConnectionException, StatementExecutionException {
+ paths.add(plan.getPath().getFullPath());
+ dataTypes.add(plan.getDataType());
+ encodings.add(plan.getEncoding());
+ compressors.add(plan.getCompressor());
+ props.add(plan.getProps() == null ? Collections.emptyMap() : plan.getProps());
+ tags.add(plan.getTags() == null ? Collections.emptyMap() : plan.getTags());
+ attributes.add(plan.getAttributes() == null ? Collections.emptyMap() : plan.getAttributes());
+ alias.add(plan.getAlias() == null ? "" : plan.getAlias());
+ batchNum += 1;
+ batchMem +=
+ (RamUsageEstimator.sizeOf(plan.getPath().getFullPath())
+ + RamUsageEstimator.sizeOf(plan.getDataType())
+ + RamUsageEstimator.sizeOf(plan.getEncoding())
+ + RamUsageEstimator.sizeOf(plan.getCompressor())
+ + RamUsageEstimator.sizeOf(
+ plan.getProps() == null ? Collections.emptyMap() : plan.getProps())
+ + RamUsageEstimator.sizeOf(
+ plan.getTags() == null ? Collections.emptyMap() : plan.getTags())
+ + RamUsageEstimator.sizeOf(
+ plan.getAttributes() == null ? Collections.emptyMap() : plan.getAttributes())
+ + RamUsageEstimator.sizeOf(plan.getAlias() == null ? "" : plan.getAlias()));
+ if (batchNum >= BATCH_NUM_THRESHOLD || batchMem >= BATCH_MEM_THRESHOLD) {
+ flushBuffer();
+ }
+ }
+
+ private void flushBuffer() throws IoTDBConnectionException, StatementExecutionException {
+ if (batchNum > 0) {
+ logger.info("Flush buffer and CreateMultiTimeseries.");
+ session.createMultiTimeseries(
+ paths, dataTypes, encodings, compressors, props, tags, attributes, alias);
+ successCnt += batchNum;
+ }
+ initBuffer();
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/tools/MLogLoaderTest.java b/server/src/test/java/org/apache/iotdb/db/tools/MLogLoaderTest.java
index 7716e8e802..0860c433b0 100644
--- a/server/src/test/java/org/apache/iotdb/db/tools/MLogLoaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/tools/MLogLoaderTest.java
@@ -107,6 +107,12 @@ public class MLogLoaderTest {
createTimeSeriesPlan3.setEncoding(TSEncoding.PLAIN);
createTimeSeriesPlan3.setCompressor(CompressionType.GZIP);
IoTDB.metaManager.createTimeseries(createTimeSeriesPlan3);
+ CreateTimeSeriesPlan createTimeSeriesPlan4 = new CreateTimeSeriesPlan();
+ createTimeSeriesPlan4.setPath(new PartialPath("root.sg1.device1.s4"));
+ createTimeSeriesPlan4.setDataType(TSDataType.DOUBLE);
+ createTimeSeriesPlan4.setEncoding(TSEncoding.PLAIN);
+ createTimeSeriesPlan4.setCompressor(CompressionType.GZIP);
+ IoTDB.metaManager.createTimeseries(createTimeSeriesPlan4);
IoTDB.metaManager.createAlignedTimeSeries(
new PartialPath("root.laptop.d1.aligned_device"),
Arrays.asList("s3", "s4", "s5"),