You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by li...@apache.org on 2019/11/14 03:56:45 UTC

[incubator-iotdb] branch dynamic_config updated: test(ActiveTimeSeriesCounterTest): add ActiveTimeSeriesCounterTest

This is an automated email from the ASF dual-hosted git repository.

liurui pushed a commit to branch dynamic_config
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dynamic_config by this push:
     new 393d56b  test(ActiveTimeSeriesCounterTest): add ActiveTimeSeriesCounterTest
393d56b is described below

commit 393d56b7523cde4a8f4fd1b8536dff52976d8e41
Author: liuruiyiyang <24...@qq.com>
AuthorDate: Thu Nov 14 11:56:22 2019 +0800

    test(ActiveTimeSeriesCounterTest): add ActiveTimeSeriesCounterTest
---
 .../db/conf/adapter/ActiveTimeSeriesCounter.java   |  53 ++++++++--
 .../org/apache/iotdb/db/metadata/MManager.java     |   1 +
 .../conf/adapter/ActiveTimeSeriesCounterTest.java  | 117 +++++++++++++++++++++
 3 files changed, 160 insertions(+), 11 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java b/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java
index 87f35a1..50e0f9d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.db.conf.adapter;
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ActiveTimeSeriesCounter implements IActiveTimeSeriesCounter {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(ActiveTimeSeriesCounter.class);
   /**
    * Map[StorageGroup, HyperLogLogCounter]
    */
@@ -35,14 +39,22 @@ public class ActiveTimeSeriesCounter implements IActiveTimeSeriesCounter {
   private static Map<String, Double> activeRatioMap = new ConcurrentHashMap<>();
 
   /**
+   * Map[StorageGroup, ActiveTimeSeriesNumber]
+   */
+  private static Map<String, Long> activeTimeSeriesNumMap = new ConcurrentHashMap<>();
+
+  /**
    * LOG2M decide the precision of the HyperLogLog algorithm
    */
   private static final int LOG2M = 13;
 
+  private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
   @Override
   public void init(String storageGroup) {
     storageGroupHllMap.put(storageGroup, new HyperLogLog(LOG2M));
     activeRatioMap.put(storageGroup, 0D);
+    activeTimeSeriesNumMap.put(storageGroup, 0L);
   }
 
   @Override
@@ -52,29 +64,48 @@ public class ActiveTimeSeriesCounter implements IActiveTimeSeriesCounter {
 
   @Override
   public void updateActiveRatio(String storageGroup) {
-    double totalActiveTsNum = 0;
-    for (Map.Entry<String, HyperLogLog> entry : storageGroupHllMap.entrySet()) {
-      totalActiveTsNum += entry.getValue().cardinality();
-    }
-    for (Map.Entry<String, HyperLogLog> entry : storageGroupHllMap.entrySet()) {
-      double activeRatio = 0;
-      if (totalActiveTsNum > 0) {
-        activeRatio = entry.getValue().cardinality() / totalActiveTsNum;
+    lock.writeLock().lock();
+    // update the active time series number in the newest memtable to be flushed
+    activeTimeSeriesNumMap.put(storageGroup, storageGroupHllMap.get(storageGroup).cardinality());
+    // initialize the HLL counter
+    storageGroupHllMap.put(storageGroup, new HyperLogLog(LOG2M));
+    try {
+      double totalActiveTsNum = 0;
+      LOGGER.info("{}: updating active ratio", Thread.currentThread().getName());
+      for (double number : activeTimeSeriesNumMap.values()) {
+        totalActiveTsNum += number;
+      }
+      for (Map.Entry<String, Long> entry : activeTimeSeriesNumMap.entrySet()) {
+        double activeRatio = 0;
+        if (totalActiveTsNum > 0) {
+          activeRatio = entry.getValue() / totalActiveTsNum;
+        }
+        activeRatioMap.put(entry.getKey(), activeRatio);
+        LOGGER.info("{}: storage group {} has active ratio {}", Thread.currentThread().getName(),
+            entry.getKey(), activeRatio);
       }
-      activeRatioMap.put(entry.getKey(), activeRatio);
+    } finally {
+      lock.writeLock().unlock();
     }
-    storageGroupHllMap.put(storageGroup, new HyperLogLog(LOG2M));
   }
 
   @Override
   public double getActiveRatio(String storageGroup) {
-    return activeRatioMap.get(storageGroup);
+    lock.writeLock().lock();
+    double ratio;
+    try {
+      ratio = activeRatioMap.get(storageGroup);
+    } finally {
+      lock.writeLock().unlock();
+    }
+    return ratio;
   }
 
   @Override
   public void delete(String storageGroup) {
     storageGroupHllMap.remove(storageGroup);
     activeRatioMap.remove(storageGroup);
+    activeTimeSeriesNumMap.remove(storageGroup);
   }
 
   private static class ActiveTimeSeriesCounterHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index d985dd2..baf76b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -648,6 +648,7 @@ public class MManager {
           IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(-1);
           mgraph.deleteStorageGroup(delStorageGroup);
           seriesNumberInStorageGroups.remove(delStorageGroup);
+          ActiveTimeSeriesCounter.getInstance().delete(delStorageGroup);
         } catch (PathException e) {
           try {
             IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
diff --git a/server/src/test/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounterTest.java b/server/src/test/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounterTest.java
new file mode 100644
index 0000000..9b51d1e
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounterTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied.  See the License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.conf.adapter;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * ActiveTimeSeriesCounter Tester.
+ */
+public class ActiveTimeSeriesCounterTest {
+
+  private static final String TEST_SG_PREFIX = "root.sg_";
+  private static int testStorageGroupNum = 10;
+  private static String[] storageGroups = new String[testStorageGroupNum];
+  private static int[] sensorNum = new int[testStorageGroupNum];
+  private static double totalSeriesNum = 0;
+
+  static {
+    for (int i = 0; i < testStorageGroupNum; i++) {
+      storageGroups[i] = TEST_SG_PREFIX + i;
+      sensorNum[i] = i + 1;
+      totalSeriesNum += sensorNum[i];
+    }
+  }
+
+  @Before
+  public void before() throws Exception {
+    for (String storageGroup : storageGroups) {
+      ActiveTimeSeriesCounter.getInstance().init(storageGroup);
+    }
+  }
+
+  @After
+  public void after() throws Exception {
+    for (String storageGroup : storageGroups) {
+      ActiveTimeSeriesCounter.getInstance().delete(storageGroup);
+    }
+  }
+
+  /**
+   * Method: init(String storageGroup)
+   */
+  @Test
+  public void testInit() throws Exception {
+    for (int i = 0; i < testStorageGroupNum; i++) {
+      assertEquals(0D, ActiveTimeSeriesCounter.getInstance().getActiveRatio(storageGroups[i]), 0.0);
+    }
+  }
+
+  /**
+   * Method: updateActiveRatio(String storageGroup)
+   */
+  @Test
+  public void testUpdateActiveRatio() throws Exception {
+    ExecutorService service = Executors.newFixedThreadPool(storageGroups.length);
+    CountDownLatch finished = new CountDownLatch(storageGroups.length);
+    for (int i = 0; i < storageGroups.length; i++) {
+      service.submit(new OfferThreads(storageGroups[i], sensorNum[i], finished));
+    }
+    finished.await();
+    for (String storageGroup : storageGroups) {
+      ActiveTimeSeriesCounter.getInstance().updateActiveRatio(storageGroup);
+      double sum = 0;
+      for (String s : storageGroups) {
+        sum += ActiveTimeSeriesCounter.getInstance().getActiveRatio(s);
+      }
+      assertEquals(1.0, sum, 0.001);
+    }
+    for (int i = 0; i < storageGroups.length; i++) {
+      double r = ActiveTimeSeriesCounter.getInstance().getActiveRatio(storageGroups[i]);
+      assertEquals(sensorNum[i] / totalSeriesNum, r, 0.001);
+    }
+  }
+
+  private static class OfferThreads implements Runnable {
+    private int sensorNum;
+    private String storageGroup;
+    private CountDownLatch finished;
+
+    private OfferThreads(String storageGroup, int sensorNum, CountDownLatch finished) {
+      this.sensorNum = sensorNum;
+      this.storageGroup = storageGroup;
+      this.finished = finished;
+    }
+
+    @Override
+    public void run() {
+      try {
+        for (int j = 0; j < sensorNum; j++) {
+          ActiveTimeSeriesCounter.getInstance().offer(storageGroup, "device_0", "sensor_" + j);
+        }
+      } finally {
+        finished.countDown();
+      }
+    }
+  }
+
+}