You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/07/13 03:26:32 UTC

[incubator-iotdb] branch fix_performance_stats created (now 16f0b3b)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a change to branch fix_performance_stats
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 16f0b3b  fix bugs in performacne stats module; add new functions in its MBean

This branch includes the following new commits:

     new 16f0b3b  fix bugs in performacne stats module; add new functions in its MBean

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: fix bugs in performacne stats module; add new functions in its MBean

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch fix_performance_stats
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 16f0b3bdf7ff19f463fbca9c8e123e0c61702bd9
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sat Jul 13 11:26:18 2019 +0800

    fix bugs in performacne stats module; add new functions in its MBean
---
 .../iotdb/db/cost/statistic/Measurement.java       | 97 ++++++++++++++--------
 .../iotdb/db/cost/statistic/MeasurementMBean.java  | 11 ++-
 .../db/cost/statistic/PerformanceStatTest.java     | 52 ++++++++++--
 3 files changed, 115 insertions(+), 45 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java b/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java
index 83c2cdf..b2c3de2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
@@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory;
  * @see Operation
  */
 public class Measurement implements MeasurementMBean, IService {
+  private static Logger logger = LoggerFactory.getLogger(Measurement.class);
 
   /**
    * queue for async store time latencies.
@@ -98,10 +100,11 @@ public class Measurement implements MeasurementMBean, IService {
   /**
    * future task of display thread and queue consumer thread.
    */
-  private List<Future<?>> futureList;
+  private ScheduledFuture<?> displayFuture;
+  private ScheduledFuture<?> consumeFuture;
 
   /**
-   * lock for change state: start() and stopStatistic().
+   * lock for change state: start() and stopPrintStatistic().
    */
   private ReentrantLock stateChangeLock = new ReentrantLock();
 
@@ -139,10 +142,9 @@ public class Measurement implements MeasurementMBean, IService {
         operationHistogram[operation.ordinal()][i] = 0;
       }
     }
-
+    logger.info("start measurement stats module...");
     service = IoTDBThreadPoolFactory.newScheduledThreadPool(
         2, ThreadName.TIME_COST_STATSTIC.getName());
-    futureList = new ArrayList<>();
   }
 
   public boolean addOperationLatency(Operation op, long startTime) {
@@ -153,20 +155,19 @@ public class Measurement implements MeasurementMBean, IService {
   }
 
   @Override
-  public void startContinuousStatistics() {
+  public void startStatistics() {
     stateChangeLock.lock();
     try {
       if (isEnableStat) {
         return;
       }
       isEnableStat = true;
-      futureList.clear();
-      Future future = service.scheduleWithFixedDelay(
-          new Measurement.DisplayRunnable(), 20, displayIntervalInMs, TimeUnit.MILLISECONDS);
-      futureList.add(future);
-      futureList.add(service.schedule(new QueueConsumerThread(), 0, TimeUnit.MILLISECONDS));
-
-
+      if (consumeFuture != null) {
+        consumeFuture = service.scheduleWithFixedDelay(
+            new Measurement.DisplayRunnable(), 20, displayIntervalInMs, TimeUnit.MILLISECONDS);
+      } else {
+        logger.info("The consuming task in measurement stat module is already running...");
+      }
     } catch (Exception e) {
       LOGGER.error("Find error when start performance statistic thread, because {}", e);
     } finally {
@@ -175,20 +176,17 @@ public class Measurement implements MeasurementMBean, IService {
   }
 
   @Override
-  public void startOneTimeStatistics() {
+  public void startContinuousPrintStatistics() {
     stateChangeLock.lock();
     try {
       if (isEnableStat) {
         return;
       }
       isEnableStat = true;
-      futureList.clear();
-      futureList.add(service.schedule(new QueueConsumerThread(), 10, TimeUnit.MILLISECONDS));
-      Future future = service.schedule(() -> {
-        showMeasurements();
-        stopStatistic();
-      }, displayIntervalInMs, TimeUnit.MILLISECONDS);
-      futureList.add(future);
+      if (displayFuture != null) {
+        displayFuture = service.scheduleWithFixedDelay(
+            new Measurement.DisplayRunnable(), 20, displayIntervalInMs, TimeUnit.MILLISECONDS);
+      }
     } catch (Exception e) {
       LOGGER.error("Find error when start performance statistic thread, because {}", e);
     } finally {
@@ -197,21 +195,37 @@ public class Measurement implements MeasurementMBean, IService {
   }
 
   @Override
-  public void stopStatistic() {
+  public void startPrintStatisticsOnce() {
+    showMeasurements();
+  }
+
+
+  @Override
+  public void stopPrintStatistic() {
     stateChangeLock.lock();
     try {
       if (!isEnableStat) {
         return;
       }
-      isEnableStat = false;
-      for (Future future : futureList) {
-        if (future != null) {
-          future.cancel(true);
+      displayFuture = cancelFuture(displayFuture);
+    } catch (Exception e) {
+      LOGGER.error("Find error when stopPrintStatistic time cost statstic thread, because {}", e);
+    } finally {
+      stateChangeLock.unlock();
+    }
+  }
 
-        }
+  public void stopStatistic() {
+    stateChangeLock.lock();
+    try {
+      if (!isEnableStat) {
+        return;
       }
+      isEnableStat = false;
+      displayFuture = cancelFuture(displayFuture);
+      consumeFuture = cancelFuture(consumeFuture);
     } catch (Exception e) {
-      LOGGER.error("Find error when stopStatistic time cost statstic thread, because {}", e);
+      LOGGER.error("Find error when stopPrintStatistic time cost statstic thread, because {}", e);
     } finally {
       stateChangeLock.unlock();
     }
@@ -245,12 +259,14 @@ public class Measurement implements MeasurementMBean, IService {
   @Override
   public void start() throws StartupException {
     // start display thread and consumer threads.
+    logger.info("start the consuming task in the measurement stats module...");
+    if (service.isShutdown()) {
+      service = IoTDBThreadPoolFactory.newScheduledThreadPool(
+          2, ThreadName.TIME_COST_STATSTIC.getName());
+    }
+    this.clearStatisticalState();
     if (isEnableStat) {
-      Future future = service.scheduleWithFixedDelay(
-          new Measurement.DisplayRunnable(), 20, displayIntervalInMs, TimeUnit.MILLISECONDS);
-      futureList.add(future);
-      futureList.add(service.schedule(new QueueConsumerThread(), 10, TimeUnit.MILLISECONDS));
-
+      consumeFuture = service.schedule(new QueueConsumerThread(), 0, TimeUnit.MILLISECONDS);
     }
     try {
       JMXService.registerMBean(INSTANCE, mbeanName);
@@ -267,14 +283,15 @@ public class Measurement implements MeasurementMBean, IService {
    */
   @Override
   public void stop() {
+    logger.info("stop measurement stats module...");
     JMXService.deregisterMBean(mbeanName);
     if (service == null || service.isShutdown()) {
       return;
     }
-    stopStatistic();
-    futureList.clear();
     service.shutdownNow();
     try {
+      consumeFuture = cancelFuture(consumeFuture);
+      displayFuture = cancelFuture(displayFuture);
       service.awaitTermination(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       LOGGER.error("Performance statistic service could not be shutdown, {}", e.getMessage());
@@ -283,6 +300,18 @@ public class Measurement implements MeasurementMBean, IService {
     }
   }
 
+  /**
+   *
+   * @param future
+   * @return always return null;
+   */
+  private ScheduledFuture<?> cancelFuture(ScheduledFuture<?> future) {
+    if (future != null) {
+      future.cancel(true);
+    }
+    return null;
+  }
+
   @Override
   public ServiceType getID() {
     return ServiceType.PERFORMANCE_STATISTIC_SERVICE;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/MeasurementMBean.java b/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/MeasurementMBean.java
index 224956f..1f69e76 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/MeasurementMBean.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/MeasurementMBean.java
@@ -24,19 +24,24 @@ import java.util.Map;
 public interface MeasurementMBean {
 
   /**
+   * start calculating the statistic.
+   */
+  void startStatistics();
+
+  /**
    * start display performance statistic every interval of displayIntervalInMs.
    */
-  void startContinuousStatistics();
+  void startContinuousPrintStatistics();
 
   /**
    * start display performance statistic after interval of displayIntervalInMs.
    */
-  void startOneTimeStatistics();
+  void startPrintStatisticsOnce();
 
   /**
    * stop display performance statistic.
    */
-  void stopStatistic();
+  void stopPrintStatistic();
 
   /**
    * clear current stat result, reset statistical state.
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/cost/statistic/PerformanceStatTest.java b/iotdb/src/test/java/org/apache/iotdb/db/cost/statistic/PerformanceStatTest.java
index 6764ffc..776785d 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/cost/statistic/PerformanceStatTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/cost/statistic/PerformanceStatTest.java
@@ -18,7 +18,11 @@
  */
 package org.apache.iotdb.db.cost.statistic;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StartupException;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,6 +31,16 @@ public class PerformanceStatTest {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PerformanceStatTest.class);
 
+  @Before
+  public void setUp() {
+    IoTDBDescriptor.getInstance().getConfig().setEnablePerformanceStat(true);
+  }
+
+  @After
+  public void tearDown() {
+    IoTDBDescriptor.getInstance().getConfig().setEnablePerformanceStat(false);
+  }
+
   @Test
   public void test() {
     Measurement measurement = Measurement.INSTANCE;
@@ -39,28 +53,28 @@ public class PerformanceStatTest {
     Assert.assertEquals(0L, batchOpCnt);
     try {
       measurement.start();
-      measurement.startContinuousStatistics();
+      measurement.startContinuousPrintStatistics();
       measurement.addOperationLatency(operation, System.currentTimeMillis());
       measurement
           .addOperationLatency(operation, System.currentTimeMillis() - 8000000);
       Thread.currentThread().sleep(1000);
       batchOpCnt = measurement.getOperationCnt()[operation.ordinal()];
       Assert.assertEquals(2L, batchOpCnt);
-      measurement.stopStatistic();
-      measurement.stopStatistic();
-      measurement.stopStatistic();
-      LOGGER.info("After stopStatistic!");
+      measurement.stopPrintStatistic();
+      measurement.stopPrintStatistic();
+      measurement.stopPrintStatistic();
+      LOGGER.info("After stopPrintStatistic!");
       Thread.currentThread().sleep(1000);
       measurement.clearStatisticalState();
       batchOpCnt = measurement.getOperationCnt()[operation.ordinal()];
       Assert.assertEquals(0L, batchOpCnt);
-      measurement.startContinuousStatistics();
+      measurement.startContinuousPrintStatistics();
       LOGGER.info("ReStart!");
       Thread.currentThread().sleep(1000);
-      measurement.startContinuousStatistics();
+      measurement.startContinuousPrintStatistics();
       LOGGER.info("ReStart2!");
       Thread.currentThread().sleep(1000);
-      measurement.stopStatistic();
+      measurement.stopPrintStatistic();
       LOGGER.info("After stopStatistic2!");
     } catch (Exception e) {
       LOGGER.error("find error in stat performance, the message is {}", e.getMessage());
@@ -68,4 +82,26 @@ public class PerformanceStatTest {
       measurement.stop();
     }
   }
+
+  @Test
+  public void testSwith() {
+    Measurement measurement = Measurement.INSTANCE;
+    try {
+      measurement.start();
+      measurement.startStatistics();
+      measurement.startStatistics();
+      measurement.startContinuousPrintStatistics();
+      measurement.stopPrintStatistic();
+      measurement.stopStatistic();
+      measurement.clearStatisticalState();
+      measurement.startPrintStatisticsOnce();
+      measurement.startContinuousPrintStatistics();
+      measurement.startStatistics();
+    } catch (StartupException e) {
+      e.printStackTrace();
+    } finally {
+      measurement.stop();
+    }
+
+  }
 }