You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by li...@apache.org on 2019/11/13 10:22:18 UTC
[incubator-iotdb] 01/01: feat(Monitor): add monitor for insert
point and request
This is an automated email from the ASF dual-hosted git repository.
liurui pushed a commit to branch 0.8.1-quick-fix
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 4813ef57db231d38d68bea1f6d968bade259cd31
Author: liuruiyiyang <24...@qq.com>
AuthorDate: Wed Nov 13 18:21:56 2019 +0800
feat(Monitor): add monitor for insert point and request
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 77 ++++++++++++++++++++-
.../apache/iotdb/db/monitor/MonitorConstants.java | 10 +--
.../org/apache/iotdb/db/monitor/StatMonitor.java | 26 ++------
.../iotdb/db/monitor/collector/FileSize.java | 1 +
.../java/org/apache/iotdb/db/service/IoTDB.java | 14 ++--
.../org/apache/iotdb/db/service/TSServiceImpl.java | 78 +++++++++++++++++++++-
6 files changed, 171 insertions(+), 35 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 49b3692..166e326 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -20,10 +20,14 @@ package org.apache.iotdb.db.engine;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -36,30 +40,39 @@ import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.StorageEngineFailureException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.monitor.IStatistic;
+import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants;
+import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StorageEngine implements IService {
+public class StorageEngine implements IService, IStatistic {
private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static final String METRIC_PREFIX = MonitorConstants.STAT_STORAGE_DELTA_NAME;
/**
* a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
* will have a subfolder under the systemDir.
*/
private final String systemDir;
+ private static AtomicLong totalInsertPoint = new AtomicLong(0);
+
/**
* storage group name -> storage group processor
*/
@@ -94,6 +107,12 @@ public class StorageEngine implements IService {
logger.error("init a storage group processor failed. ", e);
throw new StorageEngineFailureException(e);
}
+
+ if (config.isEnableStatMonitor()) {
+ StatMonitor statMonitor = StatMonitor.getInstance();
+ registerStatMetadata();
+ statMonitor.registerStatistics(METRIC_PREFIX, this);
+ }
}
@Override
@@ -153,7 +172,7 @@ public class StorageEngine implements IService {
* @return true if and only if this insertion succeeds
*/
public boolean insert(InsertPlan insertPlan) throws StorageEngineException {
-
+ totalInsertPoint.addAndGet(insertPlan.getMeasurements().length);
StorageGroupProcessor storageGroupProcessor;
try {
storageGroupProcessor = getProcessor(insertPlan.getDeviceId());
@@ -321,4 +340,58 @@ public class StorageEngine implements IService {
return true;
}
+ @Override
+ public Map<String, TSRecord> getAllStatisticsValue() {
+ long curTime = System.currentTimeMillis();
+ TSRecord tsRecord = StatMonitor
+ .convertToTSRecord(getStatParamsHashMap(), METRIC_PREFIX,
+ curTime);
+ HashMap<String, TSRecord> ret = new HashMap<>();
+ ret.put(METRIC_PREFIX, tsRecord);
+ return ret;
+ }
+
+ @Override
+ public void registerStatMetadata() {
+ Map<String, String> hashMap = new HashMap<>();
+ for (FileNodeManagerStatConstants kind : FileNodeManagerStatConstants.values()) {
+ String seriesPath = METRIC_PREFIX
+ + MonitorConstants.MONITOR_PATH_SEPARATOR
+ + kind.name();
+ hashMap.put(seriesPath, MonitorConstants.DATA_TYPE_INT64);
+ Path path = new Path(seriesPath);
+ try {
+ addTimeSeries(path, TSDataType.valueOf(MonitorConstants.DATA_TYPE_INT64),
+ TSEncoding.valueOf("RLE"), CompressionType.valueOf(TSFileConfig.compressor),
+ Collections.emptyMap());
+ } catch (StorageEngineException e) {
+ logger.error("Register File Size Stats into storageEngine Failed.", e);
+ }
+ }
+ StatMonitor.getInstance().registerStatStorageGroup(hashMap);
+ }
+
+ @Override
+ public List<String> getAllPathForStatistic() {
+ List<String> list = new ArrayList<>();
+ for (FileNodeManagerStatConstants kind : MonitorConstants.FileNodeManagerStatConstants.values()) {
+ list.add(
+ METRIC_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR
+ + kind.name());
+ }
+ return list;
+ }
+
+ @Override
+ public Map<String, AtomicLong> getStatParamsHashMap() {
+ Map<FileNodeManagerStatConstants, Long> fileSizeMap = new EnumMap<>(FileNodeManagerStatConstants.class);
+ fileSizeMap.put(FileNodeManagerStatConstants.TOTAL_POINTS_SUCCESS, totalInsertPoint.get());
+ fileSizeMap.put(FileNodeManagerStatConstants.TOTAL_POINTS_FAIL, 0L);
+ Map<String, AtomicLong> statParamsMap = new HashMap<>();
+ for (FileNodeManagerStatConstants kind : MonitorConstants.FileNodeManagerStatConstants.values()) {
+ statParamsMap.put(kind.name(), new AtomicLong(fileSizeMap.get(kind)));
+ }
+ return statParamsMap;
+ }
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java b/server/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java
index 469a374..3ea3631 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java
@@ -40,7 +40,7 @@ public class MonitorConstants {
public static final String FILE_SIZE_STORAGE_GROUP_NAME = STAT_STORAGE_GROUP_PREFIX
+ MONITOR_PATH_SEPARATOR + FILE_SIZE;
// statistic for insert module
- static final String FILE_NODE_MANAGER_PATH = "write.global";
+ private static final String FILE_NODE_MANAGER_PATH = "write.global";
public static final String FILE_NODE_PATH = "write";
/**
* Stat information.
@@ -58,7 +58,7 @@ public class MonitorConstants {
HashMap<String, AtomicLong> hashMap = new HashMap<>();
switch (constantsType) {
case FILENODE_PROCESSOR_CONST:
- for (FileNodeProcessorStatConstants statConstant : FileNodeProcessorStatConstants
+ for (RequestConstants statConstant : RequestConstants
.values()) {
hashMap.put(statConstant.name(), new AtomicLong(0));
}
@@ -79,11 +79,11 @@ public class MonitorConstants {
}
public enum FileNodeManagerStatConstants {
- TOTAL_POINTS, TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL, TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL
+ TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL
}
- public enum FileNodeProcessorStatConstants {
- TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL, TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL
+ public enum RequestConstants {
+ TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL
}
public enum OsStatConstants {
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index 30ce3ea..9d8012c 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -35,8 +35,6 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants;
-import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeProcessorStatConstants;
import org.apache.iotdb.db.monitor.collector.FileSize;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.service.IService;
@@ -95,12 +93,12 @@ public class StatMonitor implements IService {
}
private void initTemporaryStatList() {
- for (FileNodeManagerStatConstants constants : FileNodeManagerStatConstants.values()) {
- temporaryStatList.add(constants.name());
- }
- for (FileNodeProcessorStatConstants constants : FileNodeProcessorStatConstants.values()) {
- temporaryStatList.add(constants.name());
- }
+// for (FileNodeManagerStatConstants constants : FileNodeManagerStatConstants.values()) {
+// temporaryStatList.add(constants.name());
+// }
+// for (RequestConstants constants : RequestConstants.values()) {
+// temporaryStatList.add(constants.name());
+// }
}
public static StatMonitor getInstance() {
@@ -138,18 +136,6 @@ public class StatMonitor implements IService {
return numInsertError.get();
}
- void registerStatStorageGroup() {
- MManager mManager = MManager.getInstance();
- String prefix = MonitorConstants.STAT_STORAGE_GROUP_PREFIX;
- try {
- if (!mManager.pathExist(prefix)) {
- mManager.setStorageLevelToMTree(prefix);
- }
- } catch (Exception e) {
- logger.error("MManager cannot set storage level to MTree.", e);
- }
- }
-
/**
* register monitor statistics time series metadata into MManager.
*
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java b/server/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java
index 01d4b52..840f103 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.monitor.IStatistic;
import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants;
import org.apache.iotdb.db.monitor.MonitorConstants.FileSizeConstants;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 6d871e3..a56d5f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -80,13 +80,6 @@ public class IoTDB implements IoTDBMBean {
IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
IoTDBDescriptor.getInstance().getConfig().setEnableWal(enableWAL);
- // When registering statMonitor, we should start recovering some statistics
- // with latest values stored
- // Warn: registMonitor() method should be called after systemDataRecovery()
- if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
- StatMonitor.getInstance().recovery();
- }
-
initMManager();
registerManager.register(StorageEngine.getInstance());
registerManager.register(MultiFileLogNodeManager.getInstance());
@@ -100,6 +93,13 @@ public class IoTDB implements IoTDBMBean {
JMXService.registerMBean(getInstance(), mbeanName);
+ // When registering statMonitor, we should start recovering some statistics
+ // with latest values stored
+ // Warn: registMonitor() method should be called after systemDataRecovery()
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+ StatMonitor.getInstance().recovery();
+ }
+
logger.info("IoTDB is set up.");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 4aa0a9e..79de7ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -28,6 +28,8 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -55,6 +57,11 @@ import org.apache.iotdb.db.exception.qp.IllegalASTFormatException;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.Metadata;
+import org.apache.iotdb.db.monitor.IStatistic;
+import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants;
+import org.apache.iotdb.db.monitor.MonitorConstants.RequestConstants;
+import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.qp.QueryProcessor;
import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -92,11 +99,15 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
import org.apache.iotdb.service.rpc.thrift.TS_Status;
import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.thrift.TException;
import org.apache.thrift.server.ServerContext;
import org.slf4j.Logger;
@@ -106,11 +117,14 @@ import org.slf4j.LoggerFactory;
* Thrift RPC implementation at server side.
*/
-public class TSServiceImpl implements TSIService.Iface, ServerContext {
+public class TSServiceImpl implements TSIService.Iface, ServerContext, IStatistic {
private static final Logger logger = LoggerFactory.getLogger(TSServiceImpl.class);
private static final String INFO_NOT_LOGIN = "{}: Not login.";
private static final String ERROR_NOT_LOGIN = "Not login";
+ private static AtomicLong totalRequestNum = new AtomicLong(0);
+ private static final String METRIC_PREFIX = MonitorConstants.STAT_STORAGE_DELTA_NAME + ".request";
+ private StorageEngine storageEngine;
protected QueryProcessor processor;
// Record the username for every rpc connection. Username.get() is null if
@@ -129,6 +143,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
public TSServiceImpl() throws IOException {
processor = new QueryProcessor(new QueryProcessExecutor());
+ storageEngine = StorageEngine.getInstance();
+
+ if (config.isEnableStatMonitor()) {
+ StatMonitor statMonitor = StatMonitor.getInstance();
+ registerStatMetadata();
+ statMonitor.registerStatistics(METRIC_PREFIX, this);
+ }
}
@Override
@@ -411,6 +432,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) {
long t1 = System.currentTimeMillis();
+ totalRequestNum.incrementAndGet();
String currStmt = null;
List<Integer> result = new ArrayList<>();
try {
@@ -931,5 +953,59 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
public long requestStatementId() {
return globalStmtId.incrementAndGet();
}
+
+ @Override
+ public Map<String, TSRecord> getAllStatisticsValue() {
+ long curTime = System.currentTimeMillis();
+ TSRecord tsRecord = StatMonitor
+ .convertToTSRecord(getStatParamsHashMap(), METRIC_PREFIX,
+ curTime);
+ HashMap<String, TSRecord> ret = new HashMap<>();
+ ret.put(METRIC_PREFIX, tsRecord);
+ return ret;
+ }
+
+ @Override
+ public void registerStatMetadata() {
+ Map<String, String> hashMap = new HashMap<>();
+ for (RequestConstants kind : RequestConstants.values()) {
+ String seriesPath = METRIC_PREFIX
+ + MonitorConstants.MONITOR_PATH_SEPARATOR
+ + kind.name();
+ hashMap.put(seriesPath, MonitorConstants.DATA_TYPE_INT64);
+ Path path = new Path(seriesPath);
+ try {
+ storageEngine.addTimeSeries(path, TSDataType.valueOf(MonitorConstants.DATA_TYPE_INT64),
+ TSEncoding.valueOf("RLE"), CompressionType.valueOf(TSFileConfig.compressor),
+ Collections.emptyMap());
+ } catch (StorageEngineException e) {
+ logger.error("Register File Size Stats into storageEngine Failed.", e);
+ }
+ }
+ StatMonitor.getInstance().registerStatStorageGroup(hashMap);
+ }
+
+ @Override
+ public List<String> getAllPathForStatistic() {
+ List<String> list = new ArrayList<>();
+ for (RequestConstants kind : MonitorConstants.RequestConstants.values()) {
+ list.add(
+ METRIC_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR
+ + kind.name());
+ }
+ return list;
+ }
+
+ @Override
+ public Map<String, AtomicLong> getStatParamsHashMap() {
+ Map<RequestConstants, Long> fileSizeMap = new EnumMap<>(RequestConstants.class);
+ fileSizeMap.put(RequestConstants.TOTAL_REQ_SUCCESS, totalRequestNum.get());
+ fileSizeMap.put(RequestConstants.TOTAL_REQ_FAIL, 0L);
+ Map<String, AtomicLong> statParamsMap = new HashMap<>();
+ for (RequestConstants kind : MonitorConstants.RequestConstants.values()) {
+ statParamsMap.put(kind.name(), new AtomicLong(fileSizeMap.get(kind)));
+ }
+ return statParamsMap;
+ }
}