You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by li...@apache.org on 2019/11/14 03:56:45 UTC
[incubator-iotdb] branch dynamic_config updated:
test(ActiveTimeSeriesCounterTest): add ActiveTimeSeriesCounterTest
This is an automated email from the ASF dual-hosted git repository.
liurui pushed a commit to branch dynamic_config
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dynamic_config by this push:
new 393d56b test(ActiveTimeSeriesCounterTest): add ActiveTimeSeriesCounterTest
393d56b is described below
commit 393d56b7523cde4a8f4fd1b8536dff52976d8e41
Author: liuruiyiyang <24...@qq.com>
AuthorDate: Thu Nov 14 11:56:22 2019 +0800
test(ActiveTimeSeriesCounterTest): add ActiveTimeSeriesCounterTest
---
.../db/conf/adapter/ActiveTimeSeriesCounter.java | 53 ++++++++--
.../org/apache/iotdb/db/metadata/MManager.java | 1 +
.../conf/adapter/ActiveTimeSeriesCounterTest.java | 117 +++++++++++++++++++++
3 files changed, 160 insertions(+), 11 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java b/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java
index 87f35a1..50e0f9d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.db.conf.adapter;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ActiveTimeSeriesCounter implements IActiveTimeSeriesCounter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ActiveTimeSeriesCounter.class);
/**
* Map[StorageGroup, HyperLogLogCounter]
*/
@@ -35,14 +39,22 @@ public class ActiveTimeSeriesCounter implements IActiveTimeSeriesCounter {
private static Map<String, Double> activeRatioMap = new ConcurrentHashMap<>();
/**
+ * Map[StorageGroup, ActiveTimeSeriesNumber]
+ */
+ private static Map<String, Long> activeTimeSeriesNumMap = new ConcurrentHashMap<>();
+
+ /**
* LOG2M decide the precision of the HyperLogLog algorithm
*/
private static final int LOG2M = 13;
+ private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
@Override
public void init(String storageGroup) {
storageGroupHllMap.put(storageGroup, new HyperLogLog(LOG2M));
activeRatioMap.put(storageGroup, 0D);
+ activeTimeSeriesNumMap.put(storageGroup, 0L);
}
@Override
@@ -52,29 +64,48 @@ public class ActiveTimeSeriesCounter implements IActiveTimeSeriesCounter {
@Override
public void updateActiveRatio(String storageGroup) {
- double totalActiveTsNum = 0;
- for (Map.Entry<String, HyperLogLog> entry : storageGroupHllMap.entrySet()) {
- totalActiveTsNum += entry.getValue().cardinality();
- }
- for (Map.Entry<String, HyperLogLog> entry : storageGroupHllMap.entrySet()) {
- double activeRatio = 0;
- if (totalActiveTsNum > 0) {
- activeRatio = entry.getValue().cardinality() / totalActiveTsNum;
+ lock.writeLock().lock();
+ // update the active time series number in the newest memtable to be flushed
+ activeTimeSeriesNumMap.put(storageGroup, storageGroupHllMap.get(storageGroup).cardinality());
+ // initialize the HLL counter
+ storageGroupHllMap.put(storageGroup, new HyperLogLog(LOG2M));
+ try {
+ double totalActiveTsNum = 0;
+ LOGGER.info("{}: updating active ratio", Thread.currentThread().getName());
+ for (double number : activeTimeSeriesNumMap.values()) {
+ totalActiveTsNum += number;
+ }
+ for (Map.Entry<String, Long> entry : activeTimeSeriesNumMap.entrySet()) {
+ double activeRatio = 0;
+ if (totalActiveTsNum > 0) {
+ activeRatio = entry.getValue() / totalActiveTsNum;
+ }
+ activeRatioMap.put(entry.getKey(), activeRatio);
+ LOGGER.info("{}: storage group {} has active ratio {}", Thread.currentThread().getName(),
+ entry.getKey(), activeRatio);
}
- activeRatioMap.put(entry.getKey(), activeRatio);
+ } finally {
+ lock.writeLock().unlock();
}
- storageGroupHllMap.put(storageGroup, new HyperLogLog(LOG2M));
}
@Override
public double getActiveRatio(String storageGroup) {
- return activeRatioMap.get(storageGroup);
+ lock.writeLock().lock();
+ double ratio;
+ try {
+ ratio = activeRatioMap.get(storageGroup);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ return ratio;
}
@Override
public void delete(String storageGroup) {
storageGroupHllMap.remove(storageGroup);
activeRatioMap.remove(storageGroup);
+ activeTimeSeriesNumMap.remove(storageGroup);
}
private static class ActiveTimeSeriesCounterHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index d985dd2..baf76b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -648,6 +648,7 @@ public class MManager {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(-1);
mgraph.deleteStorageGroup(delStorageGroup);
seriesNumberInStorageGroups.remove(delStorageGroup);
+ ActiveTimeSeriesCounter.getInstance().delete(delStorageGroup);
} catch (PathException e) {
try {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
diff --git a/server/src/test/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounterTest.java b/server/src/test/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounterTest.java
new file mode 100644
index 0000000..9b51d1e
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounterTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.conf.adapter;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * ActiveTimeSeriesCounter Tester.
+ */
+public class ActiveTimeSeriesCounterTest {
+
+ private static final String TEST_SG_PREFIX = "root.sg_";
+ private static int testStorageGroupNum = 10;
+ private static String[] storageGroups = new String[testStorageGroupNum];
+ private static int[] sensorNum = new int[testStorageGroupNum];
+ private static double totalSeriesNum = 0;
+
+ static {
+ for (int i = 0; i < testStorageGroupNum; i++) {
+ storageGroups[i] = TEST_SG_PREFIX + i;
+ sensorNum[i] = i + 1;
+ totalSeriesNum += sensorNum[i];
+ }
+ }
+
+ @Before
+ public void before() throws Exception {
+ for (String storageGroup : storageGroups) {
+ ActiveTimeSeriesCounter.getInstance().init(storageGroup);
+ }
+ }
+
+ @After
+ public void after() throws Exception {
+ for (String storageGroup : storageGroups) {
+ ActiveTimeSeriesCounter.getInstance().delete(storageGroup);
+ }
+ }
+
+ /**
+ * Method: init(String storageGroup)
+ */
+ @Test
+ public void testInit() throws Exception {
+ for (int i = 0; i < testStorageGroupNum; i++) {
+ assertEquals(0D, ActiveTimeSeriesCounter.getInstance().getActiveRatio(storageGroups[i]), 0.0);
+ }
+ }
+
+ /**
+ * Method: updateActiveRatio(String storageGroup)
+ */
+ @Test
+ public void testUpdateActiveRatio() throws Exception {
+ ExecutorService service = Executors.newFixedThreadPool(storageGroups.length);
+ CountDownLatch finished = new CountDownLatch(storageGroups.length);
+ for (int i = 0; i < storageGroups.length; i++) {
+ service.submit(new OfferThreads(storageGroups[i], sensorNum[i], finished));
+ }
+ finished.await();
+ for (String storageGroup : storageGroups) {
+ ActiveTimeSeriesCounter.getInstance().updateActiveRatio(storageGroup);
+ double sum = 0;
+ for (String s : storageGroups) {
+ sum += ActiveTimeSeriesCounter.getInstance().getActiveRatio(s);
+ }
+ assertEquals(1.0, sum, 0.001);
+ }
+ for (int i = 0; i < storageGroups.length; i++) {
+ double r = ActiveTimeSeriesCounter.getInstance().getActiveRatio(storageGroups[i]);
+ assertEquals(sensorNum[i] / totalSeriesNum, r, 0.001);
+ }
+ }
+
+ private static class OfferThreads implements Runnable {
+ private int sensorNum;
+ private String storageGroup;
+ private CountDownLatch finished;
+
+ private OfferThreads(String storageGroup, int sensorNum, CountDownLatch finished) {
+ this.sensorNum = sensorNum;
+ this.storageGroup = storageGroup;
+ this.finished = finished;
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int j = 0; j < sensorNum; j++) {
+ ActiveTimeSeriesCounter.getInstance().offer(storageGroup, "device_0", "sensor_" + j);
+ }
+ } finally {
+ finished.countDown();
+ }
+ }
+ }
+
+}