You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by li...@apache.org on 2019/11/13 10:22:17 UTC

[incubator-iotdb] branch 0.8.1-quick-fix created (now 4813ef5)

This is an automated email from the ASF dual-hosted git repository.

liurui pushed a change to branch 0.8.1-quick-fix
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 4813ef5  feat(Monitor): add monitor for insert point and request

This branch includes the following new commits:

     new 4813ef5  feat(Monitor): add monitor for insert point and request

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: feat(Monitor): add monitor for insert point and request

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liurui pushed a commit to branch 0.8.1-quick-fix
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 4813ef57db231d38d68bea1f6d968bade259cd31
Author: liuruiyiyang <24...@qq.com>
AuthorDate: Wed Nov 13 18:21:56 2019 +0800

    feat(Monitor): add monitor for insert point and request
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 77 ++++++++++++++++++++-
 .../apache/iotdb/db/monitor/MonitorConstants.java  | 10 +--
 .../org/apache/iotdb/db/monitor/StatMonitor.java   | 26 ++------
 .../iotdb/db/monitor/collector/FileSize.java       |  1 +
 .../java/org/apache/iotdb/db/service/IoTDB.java    | 14 ++--
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 78 +++++++++++++++++++++-
 6 files changed, 171 insertions(+), 35 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 49b3692..166e326 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -20,10 +20,14 @@ package org.apache.iotdb.db.engine;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -36,30 +40,39 @@ import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.StorageEngineFailureException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.monitor.IStatistic;
+import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants;
+import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StorageEngine implements IService {
+public class StorageEngine implements IService, IStatistic {
 
   private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
+  private static final String METRIC_PREFIX = MonitorConstants.STAT_STORAGE_DELTA_NAME;
   /**
    * a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
    * will have a subfolder under the systemDir.
    */
   private final String systemDir;
 
+  private static AtomicLong totalInsertPoint = new AtomicLong(0);
+
   /**
    * storage group name -> storage group processor
    */
@@ -94,6 +107,12 @@ public class StorageEngine implements IService {
       logger.error("init a storage group processor failed. ", e);
       throw new StorageEngineFailureException(e);
     }
+
+    if (config.isEnableStatMonitor()) {
+      StatMonitor statMonitor = StatMonitor.getInstance();
+      registerStatMetadata();
+      statMonitor.registerStatistics(METRIC_PREFIX, this);
+    }
   }
 
   @Override
@@ -153,7 +172,7 @@ public class StorageEngine implements IService {
    * @return true if and only if this insertion succeeds
    */
   public boolean insert(InsertPlan insertPlan) throws StorageEngineException {
-
+    totalInsertPoint.addAndGet(insertPlan.getMeasurements().length);
     StorageGroupProcessor storageGroupProcessor;
     try {
       storageGroupProcessor = getProcessor(insertPlan.getDeviceId());
@@ -321,4 +340,58 @@ public class StorageEngine implements IService {
     return true;
   }
 
+  @Override
+  public Map<String, TSRecord> getAllStatisticsValue() {
+    long curTime = System.currentTimeMillis();
+    TSRecord tsRecord = StatMonitor
+        .convertToTSRecord(getStatParamsHashMap(), METRIC_PREFIX,
+            curTime);
+    HashMap<String, TSRecord> ret = new HashMap<>();
+    ret.put(METRIC_PREFIX, tsRecord);
+    return ret;
+  }
+
+  @Override
+  public void registerStatMetadata() {
+    Map<String, String> hashMap = new HashMap<>();
+    for (FileNodeManagerStatConstants kind : FileNodeManagerStatConstants.values()) {
+      String seriesPath = METRIC_PREFIX
+          + MonitorConstants.MONITOR_PATH_SEPARATOR
+          + kind.name();
+      hashMap.put(seriesPath, MonitorConstants.DATA_TYPE_INT64);
+      Path path = new Path(seriesPath);
+      try {
+        addTimeSeries(path, TSDataType.valueOf(MonitorConstants.DATA_TYPE_INT64),
+            TSEncoding.valueOf("RLE"), CompressionType.valueOf(TSFileConfig.compressor),
+            Collections.emptyMap());
+      } catch (StorageEngineException e) {
+        logger.error("Register File Size Stats into storageEngine Failed.", e);
+      }
+    }
+    StatMonitor.getInstance().registerStatStorageGroup(hashMap);
+  }
+
+  @Override
+  public List<String> getAllPathForStatistic() {
+    List<String> list = new ArrayList<>();
+    for (FileNodeManagerStatConstants kind : MonitorConstants.FileNodeManagerStatConstants.values()) {
+      list.add(
+          METRIC_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR
+              + kind.name());
+    }
+    return list;
+  }
+
+  @Override
+  public Map<String, AtomicLong> getStatParamsHashMap() {
+    Map<FileNodeManagerStatConstants, Long> fileSizeMap = new EnumMap<>(FileNodeManagerStatConstants.class);
+    fileSizeMap.put(FileNodeManagerStatConstants.TOTAL_POINTS_SUCCESS, totalInsertPoint.get());
+    fileSizeMap.put(FileNodeManagerStatConstants.TOTAL_POINTS_FAIL, 0L);
+    Map<String, AtomicLong> statParamsMap = new HashMap<>();
+    for (FileNodeManagerStatConstants kind : MonitorConstants.FileNodeManagerStatConstants.values()) {
+      statParamsMap.put(kind.name(), new AtomicLong(fileSizeMap.get(kind)));
+    }
+    return statParamsMap;
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java b/server/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java
index 469a374..3ea3631 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java
@@ -40,7 +40,7 @@ public class MonitorConstants {
   public static final String FILE_SIZE_STORAGE_GROUP_NAME = STAT_STORAGE_GROUP_PREFIX
       + MONITOR_PATH_SEPARATOR + FILE_SIZE;
   // statistic for insert module
-  static final String FILE_NODE_MANAGER_PATH = "write.global";
+  private static final String FILE_NODE_MANAGER_PATH = "write.global";
   public static final String FILE_NODE_PATH = "write";
   /**
    * Stat information.
@@ -58,7 +58,7 @@ public class MonitorConstants {
     HashMap<String, AtomicLong> hashMap = new HashMap<>();
     switch (constantsType) {
       case FILENODE_PROCESSOR_CONST:
-        for (FileNodeProcessorStatConstants statConstant : FileNodeProcessorStatConstants
+        for (RequestConstants statConstant : RequestConstants
             .values()) {
           hashMap.put(statConstant.name(), new AtomicLong(0));
         }
@@ -79,11 +79,11 @@ public class MonitorConstants {
   }
 
   public enum FileNodeManagerStatConstants {
-    TOTAL_POINTS, TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL, TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL
+    TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL
   }
 
-  public enum FileNodeProcessorStatConstants {
-    TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL, TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL
+  public enum RequestConstants {
+    TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL
   }
 
   public enum OsStatConstants {
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index 30ce3ea..9d8012c 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -35,8 +35,6 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants;
-import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeProcessorStatConstants;
 import org.apache.iotdb.db.monitor.collector.FileSize;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.service.IService;
@@ -95,12 +93,12 @@ public class StatMonitor implements IService {
   }
 
   private void initTemporaryStatList() {
-    for (FileNodeManagerStatConstants constants : FileNodeManagerStatConstants.values()) {
-      temporaryStatList.add(constants.name());
-    }
-    for (FileNodeProcessorStatConstants constants : FileNodeProcessorStatConstants.values()) {
-      temporaryStatList.add(constants.name());
-    }
+//    for (FileNodeManagerStatConstants constants : FileNodeManagerStatConstants.values()) {
+//      temporaryStatList.add(constants.name());
+//    }
+//    for (RequestConstants constants : RequestConstants.values()) {
+//      temporaryStatList.add(constants.name());
+//    }
   }
 
   public static StatMonitor getInstance() {
@@ -138,18 +136,6 @@ public class StatMonitor implements IService {
     return numInsertError.get();
   }
 
-  void registerStatStorageGroup() {
-    MManager mManager = MManager.getInstance();
-    String prefix = MonitorConstants.STAT_STORAGE_GROUP_PREFIX;
-    try {
-      if (!mManager.pathExist(prefix)) {
-        mManager.setStorageLevelToMTree(prefix);
-      }
-    } catch (Exception e) {
-      logger.error("MManager cannot set storage level to MTree.", e);
-    }
-  }
-
   /**
    * register monitor statistics time series metadata into MManager.
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java b/server/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java
index 01d4b52..840f103 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.monitor.IStatistic;
 import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants;
 import org.apache.iotdb.db.monitor.MonitorConstants.FileSizeConstants;
 import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 6d871e3..a56d5f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -80,13 +80,6 @@ public class IoTDB implements IoTDBMBean {
     IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
     IoTDBDescriptor.getInstance().getConfig().setEnableWal(enableWAL);
 
-    // When registering statMonitor, we should start recovering some statistics
-    // with latest values stored
-    // Warn: registMonitor() method should be called after systemDataRecovery()
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
-      StatMonitor.getInstance().recovery();
-    }
-
     initMManager();
     registerManager.register(StorageEngine.getInstance());
     registerManager.register(MultiFileLogNodeManager.getInstance());
@@ -100,6 +93,13 @@ public class IoTDB implements IoTDBMBean {
 
     JMXService.registerMBean(getInstance(), mbeanName);
 
+    // When registering statMonitor, we should start recovering some statistics
+    // with latest values stored
+    // Warn: registMonitor() method should be called after systemDataRecovery()
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+      StatMonitor.getInstance().recovery();
+    }
+
     logger.info("IoTDB is set up.");
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 4aa0a9e..79de7ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -28,6 +28,8 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -55,6 +57,11 @@ import org.apache.iotdb.db.exception.qp.IllegalASTFormatException;
 import org.apache.iotdb.db.exception.qp.QueryProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.Metadata;
+import org.apache.iotdb.db.monitor.IStatistic;
+import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants;
+import org.apache.iotdb.db.monitor.MonitorConstants.RequestConstants;
+import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.qp.QueryProcessor;
 import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -92,11 +99,15 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
 import org.apache.iotdb.service.rpc.thrift.TS_Status;
 import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.thrift.TException;
 import org.apache.thrift.server.ServerContext;
 import org.slf4j.Logger;
@@ -106,11 +117,14 @@ import org.slf4j.LoggerFactory;
  * Thrift RPC implementation at server side.
  */
 
-public class TSServiceImpl implements TSIService.Iface, ServerContext {
+public class TSServiceImpl implements TSIService.Iface, ServerContext, IStatistic {
 
   private static final Logger logger = LoggerFactory.getLogger(TSServiceImpl.class);
   private static final String INFO_NOT_LOGIN = "{}: Not login.";
   private static final String ERROR_NOT_LOGIN = "Not login";
+  private static AtomicLong totalRequestNum = new AtomicLong(0);
+  private static final String METRIC_PREFIX = MonitorConstants.STAT_STORAGE_DELTA_NAME + ".request";
+  private StorageEngine storageEngine;
 
   protected QueryProcessor processor;
   // Record the username for every rpc connection. Username.get() is null if
@@ -129,6 +143,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
   public TSServiceImpl() throws IOException {
     processor = new QueryProcessor(new QueryProcessExecutor());
+    storageEngine = StorageEngine.getInstance();
+
+    if (config.isEnableStatMonitor()) {
+      StatMonitor statMonitor = StatMonitor.getInstance();
+      registerStatMetadata();
+      statMonitor.registerStatistics(METRIC_PREFIX, this);
+    }
   }
 
   @Override
@@ -411,6 +432,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   @Override
   public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) {
     long t1 = System.currentTimeMillis();
+    totalRequestNum.incrementAndGet();
     String currStmt = null;
     List<Integer> result = new ArrayList<>();
     try {
@@ -931,5 +953,59 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   public long requestStatementId() {
     return globalStmtId.incrementAndGet();
   }
+
+  @Override
+  public Map<String, TSRecord> getAllStatisticsValue() {
+    long curTime = System.currentTimeMillis();
+    TSRecord tsRecord = StatMonitor
+        .convertToTSRecord(getStatParamsHashMap(), METRIC_PREFIX,
+            curTime);
+    HashMap<String, TSRecord> ret = new HashMap<>();
+    ret.put(METRIC_PREFIX, tsRecord);
+    return ret;
+  }
+
+  @Override
+  public void registerStatMetadata() {
+    Map<String, String> hashMap = new HashMap<>();
+    for (RequestConstants kind : RequestConstants.values()) {
+      String seriesPath = METRIC_PREFIX
+          + MonitorConstants.MONITOR_PATH_SEPARATOR
+          + kind.name();
+      hashMap.put(seriesPath, MonitorConstants.DATA_TYPE_INT64);
+      Path path = new Path(seriesPath);
+      try {
+        storageEngine.addTimeSeries(path, TSDataType.valueOf(MonitorConstants.DATA_TYPE_INT64),
+            TSEncoding.valueOf("RLE"), CompressionType.valueOf(TSFileConfig.compressor),
+            Collections.emptyMap());
+      } catch (StorageEngineException e) {
+        logger.error("Register File Size Stats into storageEngine Failed.", e);
+      }
+    }
+    StatMonitor.getInstance().registerStatStorageGroup(hashMap);
+  }
+
+  @Override
+  public List<String> getAllPathForStatistic() {
+    List<String> list = new ArrayList<>();
+    for (RequestConstants kind : MonitorConstants.RequestConstants.values()) {
+      list.add(
+          METRIC_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR
+              + kind.name());
+    }
+    return list;
+  }
+
+  @Override
+  public Map<String, AtomicLong> getStatParamsHashMap() {
+    Map<RequestConstants, Long> fileSizeMap = new EnumMap<>(RequestConstants.class);
+    fileSizeMap.put(RequestConstants.TOTAL_REQ_SUCCESS, totalRequestNum.get());
+    fileSizeMap.put(RequestConstants.TOTAL_REQ_FAIL, 0L);
+    Map<String, AtomicLong> statParamsMap = new HashMap<>();
+    for (RequestConstants kind : MonitorConstants.RequestConstants.values()) {
+      statParamsMap.put(kind.name(), new AtomicLong(fileSizeMap.get(kind)));
+    }
+    return statParamsMap;
+  }
 }