You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by li...@apache.org on 2019/11/13 10:22:18 UTC

[incubator-iotdb] 01/01: feat(Monitor): add monitor for insert point and request

This is an automated email from the ASF dual-hosted git repository.

liurui pushed a commit to branch 0.8.1-quick-fix
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 4813ef57db231d38d68bea1f6d968bade259cd31
Author: liuruiyiyang <24...@qq.com>
AuthorDate: Wed Nov 13 18:21:56 2019 +0800

    feat(Monitor): add monitor for insert point and request
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 77 ++++++++++++++++++++-
 .../apache/iotdb/db/monitor/MonitorConstants.java  | 10 +--
 .../org/apache/iotdb/db/monitor/StatMonitor.java   | 26 ++------
 .../iotdb/db/monitor/collector/FileSize.java       |  1 +
 .../java/org/apache/iotdb/db/service/IoTDB.java    | 14 ++--
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 78 +++++++++++++++++++++-
 6 files changed, 171 insertions(+), 35 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 49b3692..166e326 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -20,10 +20,14 @@ package org.apache.iotdb.db.engine;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -36,30 +40,39 @@ import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.StorageEngineFailureException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.monitor.IStatistic;
+import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants;
+import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StorageEngine implements IService {
+public class StorageEngine implements IService, IStatistic {
 
   private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
+  private static final String METRIC_PREFIX = MonitorConstants.STAT_STORAGE_DELTA_NAME;
   /**
    * a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
    * will have a subfolder under the systemDir.
    */
   private final String systemDir;
 
+  private static AtomicLong totalInsertPoint = new AtomicLong(0);
+
   /**
    * storage group name -> storage group processor
    */
@@ -94,6 +107,12 @@ public class StorageEngine implements IService {
       logger.error("init a storage group processor failed. ", e);
       throw new StorageEngineFailureException(e);
     }
+
+    if (config.isEnableStatMonitor()) {
+      StatMonitor statMonitor = StatMonitor.getInstance();
+      registerStatMetadata();
+      statMonitor.registerStatistics(METRIC_PREFIX, this);
+    }
   }
 
   @Override
@@ -153,7 +172,7 @@ public class StorageEngine implements IService {
    * @return true if and only if this insertion succeeds
    */
   public boolean insert(InsertPlan insertPlan) throws StorageEngineException {
-
+    totalInsertPoint.addAndGet(insertPlan.getMeasurements().length);
     StorageGroupProcessor storageGroupProcessor;
     try {
       storageGroupProcessor = getProcessor(insertPlan.getDeviceId());
@@ -321,4 +340,58 @@ public class StorageEngine implements IService {
     return true;
   }
 
+  @Override
+  public Map<String, TSRecord> getAllStatisticsValue() {
+    long curTime = System.currentTimeMillis();
+    TSRecord tsRecord = StatMonitor
+        .convertToTSRecord(getStatParamsHashMap(), METRIC_PREFIX,
+            curTime);
+    HashMap<String, TSRecord> ret = new HashMap<>();
+    ret.put(METRIC_PREFIX, tsRecord);
+    return ret;
+  }
+
+  @Override
+  public void registerStatMetadata() {
+    Map<String, String> hashMap = new HashMap<>();
+    for (FileNodeManagerStatConstants kind : FileNodeManagerStatConstants.values()) {
+      String seriesPath = METRIC_PREFIX
+          + MonitorConstants.MONITOR_PATH_SEPARATOR
+          + kind.name();
+      hashMap.put(seriesPath, MonitorConstants.DATA_TYPE_INT64);
+      Path path = new Path(seriesPath);
+      try {
+        addTimeSeries(path, TSDataType.valueOf(MonitorConstants.DATA_TYPE_INT64),
+            TSEncoding.valueOf("RLE"), CompressionType.valueOf(TSFileConfig.compressor),
+            Collections.emptyMap());
+      } catch (StorageEngineException e) {
+        logger.error("Register File Size Stats into storageEngine Failed.", e);
+      }
+    }
+    StatMonitor.getInstance().registerStatStorageGroup(hashMap);
+  }
+
+  @Override
+  public List<String> getAllPathForStatistic() {
+    List<String> list = new ArrayList<>();
+    for (FileNodeManagerStatConstants kind : MonitorConstants.FileNodeManagerStatConstants.values()) {
+      list.add(
+          METRIC_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR
+              + kind.name());
+    }
+    return list;
+  }
+
+  @Override
+  public Map<String, AtomicLong> getStatParamsHashMap() {
+    Map<FileNodeManagerStatConstants, Long> fileSizeMap = new EnumMap<>(FileNodeManagerStatConstants.class);
+    fileSizeMap.put(FileNodeManagerStatConstants.TOTAL_POINTS_SUCCESS, totalInsertPoint.get());
+    fileSizeMap.put(FileNodeManagerStatConstants.TOTAL_POINTS_FAIL, 0L);
+    Map<String, AtomicLong> statParamsMap = new HashMap<>();
+    for (FileNodeManagerStatConstants kind : MonitorConstants.FileNodeManagerStatConstants.values()) {
+      statParamsMap.put(kind.name(), new AtomicLong(fileSizeMap.get(kind)));
+    }
+    return statParamsMap;
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java b/server/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java
index 469a374..3ea3631 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java
@@ -40,7 +40,7 @@ public class MonitorConstants {
   public static final String FILE_SIZE_STORAGE_GROUP_NAME = STAT_STORAGE_GROUP_PREFIX
       + MONITOR_PATH_SEPARATOR + FILE_SIZE;
   // statistic for insert module
-  static final String FILE_NODE_MANAGER_PATH = "write.global";
+  private static final String FILE_NODE_MANAGER_PATH = "write.global";
   public static final String FILE_NODE_PATH = "write";
   /**
    * Stat information.
@@ -58,7 +58,7 @@ public class MonitorConstants {
     HashMap<String, AtomicLong> hashMap = new HashMap<>();
     switch (constantsType) {
       case FILENODE_PROCESSOR_CONST:
-        for (FileNodeProcessorStatConstants statConstant : FileNodeProcessorStatConstants
+        for (RequestConstants statConstant : RequestConstants
             .values()) {
           hashMap.put(statConstant.name(), new AtomicLong(0));
         }
@@ -79,11 +79,11 @@ public class MonitorConstants {
   }
 
   public enum FileNodeManagerStatConstants {
-    TOTAL_POINTS, TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL, TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL
+    TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL
   }
 
-  public enum FileNodeProcessorStatConstants {
-    TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL, TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL
+  public enum RequestConstants {
+    TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL
   }
 
   public enum OsStatConstants {
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index 30ce3ea..9d8012c 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -35,8 +35,6 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants;
-import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeProcessorStatConstants;
 import org.apache.iotdb.db.monitor.collector.FileSize;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.service.IService;
@@ -95,12 +93,12 @@ public class StatMonitor implements IService {
   }
 
   private void initTemporaryStatList() {
-    for (FileNodeManagerStatConstants constants : FileNodeManagerStatConstants.values()) {
-      temporaryStatList.add(constants.name());
-    }
-    for (FileNodeProcessorStatConstants constants : FileNodeProcessorStatConstants.values()) {
-      temporaryStatList.add(constants.name());
-    }
+//    for (FileNodeManagerStatConstants constants : FileNodeManagerStatConstants.values()) {
+//      temporaryStatList.add(constants.name());
+//    }
+//    for (RequestConstants constants : RequestConstants.values()) {
+//      temporaryStatList.add(constants.name());
+//    }
   }
 
   public static StatMonitor getInstance() {
@@ -138,18 +136,6 @@ public class StatMonitor implements IService {
     return numInsertError.get();
   }
 
-  void registerStatStorageGroup() {
-    MManager mManager = MManager.getInstance();
-    String prefix = MonitorConstants.STAT_STORAGE_GROUP_PREFIX;
-    try {
-      if (!mManager.pathExist(prefix)) {
-        mManager.setStorageLevelToMTree(prefix);
-      }
-    } catch (Exception e) {
-      logger.error("MManager cannot set storage level to MTree.", e);
-    }
-  }
-
   /**
    * register monitor statistics time series metadata into MManager.
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java b/server/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java
index 01d4b52..840f103 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.monitor.IStatistic;
 import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants;
 import org.apache.iotdb.db.monitor.MonitorConstants.FileSizeConstants;
 import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 6d871e3..a56d5f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -80,13 +80,6 @@ public class IoTDB implements IoTDBMBean {
     IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
     IoTDBDescriptor.getInstance().getConfig().setEnableWal(enableWAL);
 
-    // When registering statMonitor, we should start recovering some statistics
-    // with latest values stored
-    // Warn: registMonitor() method should be called after systemDataRecovery()
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
-      StatMonitor.getInstance().recovery();
-    }
-
     initMManager();
     registerManager.register(StorageEngine.getInstance());
     registerManager.register(MultiFileLogNodeManager.getInstance());
@@ -100,6 +93,13 @@ public class IoTDB implements IoTDBMBean {
 
     JMXService.registerMBean(getInstance(), mbeanName);
 
+    // When registering statMonitor, we should start recovering some statistics
+    // with latest values stored
+    // Warn: registMonitor() method should be called after systemDataRecovery()
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+      StatMonitor.getInstance().recovery();
+    }
+
     logger.info("IoTDB is set up.");
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 4aa0a9e..79de7ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -28,6 +28,8 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -55,6 +57,11 @@ import org.apache.iotdb.db.exception.qp.IllegalASTFormatException;
 import org.apache.iotdb.db.exception.qp.QueryProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.Metadata;
+import org.apache.iotdb.db.monitor.IStatistic;
+import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants;
+import org.apache.iotdb.db.monitor.MonitorConstants.RequestConstants;
+import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.qp.QueryProcessor;
 import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -92,11 +99,15 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
 import org.apache.iotdb.service.rpc.thrift.TS_Status;
 import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.thrift.TException;
 import org.apache.thrift.server.ServerContext;
 import org.slf4j.Logger;
@@ -106,11 +117,14 @@ import org.slf4j.LoggerFactory;
  * Thrift RPC implementation at server side.
  */
 
-public class TSServiceImpl implements TSIService.Iface, ServerContext {
+public class TSServiceImpl implements TSIService.Iface, ServerContext, IStatistic {
 
   private static final Logger logger = LoggerFactory.getLogger(TSServiceImpl.class);
   private static final String INFO_NOT_LOGIN = "{}: Not login.";
   private static final String ERROR_NOT_LOGIN = "Not login";
+  private static AtomicLong totalRequestNum = new AtomicLong(0);
+  private static final String METRIC_PREFIX = MonitorConstants.STAT_STORAGE_DELTA_NAME + ".request";
+  private StorageEngine storageEngine;
 
   protected QueryProcessor processor;
   // Record the username for every rpc connection. Username.get() is null if
@@ -129,6 +143,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
   public TSServiceImpl() throws IOException {
     processor = new QueryProcessor(new QueryProcessExecutor());
+    storageEngine = StorageEngine.getInstance();
+
+    if (config.isEnableStatMonitor()) {
+      StatMonitor statMonitor = StatMonitor.getInstance();
+      registerStatMetadata();
+      statMonitor.registerStatistics(METRIC_PREFIX, this);
+    }
   }
 
   @Override
@@ -411,6 +432,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   @Override
   public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) {
     long t1 = System.currentTimeMillis();
+    totalRequestNum.incrementAndGet();
     String currStmt = null;
     List<Integer> result = new ArrayList<>();
     try {
@@ -931,5 +953,59 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   public long requestStatementId() {
     return globalStmtId.incrementAndGet();
   }
+
+  @Override
+  public Map<String, TSRecord> getAllStatisticsValue() {
+    long curTime = System.currentTimeMillis();
+    TSRecord tsRecord = StatMonitor
+        .convertToTSRecord(getStatParamsHashMap(), METRIC_PREFIX,
+            curTime);
+    HashMap<String, TSRecord> ret = new HashMap<>();
+    ret.put(METRIC_PREFIX, tsRecord);
+    return ret;
+  }
+
+  @Override
+  public void registerStatMetadata() {
+    Map<String, String> hashMap = new HashMap<>();
+    for (RequestConstants kind : RequestConstants.values()) {
+      String seriesPath = METRIC_PREFIX
+          + MonitorConstants.MONITOR_PATH_SEPARATOR
+          + kind.name();
+      hashMap.put(seriesPath, MonitorConstants.DATA_TYPE_INT64);
+      Path path = new Path(seriesPath);
+      try {
+        storageEngine.addTimeSeries(path, TSDataType.valueOf(MonitorConstants.DATA_TYPE_INT64),
+            TSEncoding.valueOf("RLE"), CompressionType.valueOf(TSFileConfig.compressor),
+            Collections.emptyMap());
+      } catch (StorageEngineException e) {
+        logger.error("Register File Size Stats into storageEngine Failed.", e);
+      }
+    }
+    StatMonitor.getInstance().registerStatStorageGroup(hashMap);
+  }
+
+  @Override
+  public List<String> getAllPathForStatistic() {
+    List<String> list = new ArrayList<>();
+    for (RequestConstants kind : MonitorConstants.RequestConstants.values()) {
+      list.add(
+          METRIC_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR
+              + kind.name());
+    }
+    return list;
+  }
+
+  @Override
+  public Map<String, AtomicLong> getStatParamsHashMap() {
+    Map<RequestConstants, Long> fileSizeMap = new EnumMap<>(RequestConstants.class);
+    fileSizeMap.put(RequestConstants.TOTAL_REQ_SUCCESS, totalRequestNum.get());
+    fileSizeMap.put(RequestConstants.TOTAL_REQ_FAIL, 0L);
+    Map<String, AtomicLong> statParamsMap = new HashMap<>();
+    for (RequestConstants kind : MonitorConstants.RequestConstants.values()) {
+      statParamsMap.put(kind.name(), new AtomicLong(fileSizeMap.get(kind)));
+    }
+    return statParamsMap;
+  }
 }