You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by li...@apache.org on 2019/11/13 06:46:19 UTC
[incubator-iotdb] 01/01: feat(ActiveTimeSeriesCounter): add
ActiveTimeSeriesCounter for memtableSize estimation
This is an automated email from the ASF dual-hosted git repository.
liurui pushed a commit to branch dynamic_config
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 73c32611e4676b33f579b321a4f79c317c6cf104
Author: liuruiyiyang <24...@qq.com>
AuthorDate: Wed Nov 13 14:45:45 2019 +0800
feat(ActiveTimeSeriesCounter): add ActiveTimeSeriesCounter for memtableSize estimation
---
server/pom.xml | 5 ++
.../db/conf/adapter/ActiveTimeSeriesCounter.java | 87 ++++++++++++++++++++++
.../db/conf/adapter/IActiveTimeSeriesCounter.java | 61 +++++++++++++++
.../iotdb/db/engine/flush/MemTableFlushTask.java | 4 +
.../db/engine/storagegroup/TsFileProcessor.java | 25 ++-----
.../org/apache/iotdb/db/metadata/MManager.java | 2 +
6 files changed, 165 insertions(+), 19 deletions(-)
diff --git a/server/pom.xml b/server/pom.xml
index 3dff2e7..643e67f 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -116,6 +116,11 @@
<artifactId>jfreechart</artifactId>
<version>1.5.0</version>
</dependency>
+ <dependency>
+ <groupId>com.clearspring.analytics</groupId>
+ <artifactId>stream</artifactId>
+ <version>2.9.5</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java b/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java
new file mode 100644
index 0000000..87f35a1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.conf.adapter;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ActiveTimeSeriesCounter implements IActiveTimeSeriesCounter {
+
+ /**
+ * Map[StorageGroup, HyperLogLogCounter]
+ */
+ private static Map<String, HyperLogLog> storageGroupHllMap = new ConcurrentHashMap<>();
+
+ /**
+ * Map[StorageGroup, ActiveTimeSeriesRatio]
+ */
+ private static Map<String, Double> activeRatioMap = new ConcurrentHashMap<>();
+
+ /**
+ * LOG2M decide the precision of the HyperLogLog algorithm
+ */
+ private static final int LOG2M = 13;
+
+ @Override
+ public void init(String storageGroup) {
+ storageGroupHllMap.put(storageGroup, new HyperLogLog(LOG2M));
+ activeRatioMap.put(storageGroup, 0D);
+ }
+
+ @Override
+ public void offer(String storageGroup, String device, String measurement) {
+ storageGroupHllMap.get(storageGroup).offer(device + measurement);
+ }
+
+ @Override
+ public void updateActiveRatio(String storageGroup) {
+ double totalActiveTsNum = 0;
+ for (Map.Entry<String, HyperLogLog> entry : storageGroupHllMap.entrySet()) {
+ totalActiveTsNum += entry.getValue().cardinality();
+ }
+ for (Map.Entry<String, HyperLogLog> entry : storageGroupHllMap.entrySet()) {
+ double activeRatio = 0;
+ if (totalActiveTsNum > 0) {
+ activeRatio = entry.getValue().cardinality() / totalActiveTsNum;
+ }
+ activeRatioMap.put(entry.getKey(), activeRatio);
+ }
+ storageGroupHllMap.put(storageGroup, new HyperLogLog(LOG2M));
+ }
+
+ @Override
+ public double getActiveRatio(String storageGroup) {
+ return activeRatioMap.get(storageGroup);
+ }
+
+ @Override
+ public void delete(String storageGroup) {
+ storageGroupHllMap.remove(storageGroup);
+ activeRatioMap.remove(storageGroup);
+ }
+
+ private static class ActiveTimeSeriesCounterHolder {
+ private static final ActiveTimeSeriesCounter INSTANCE = new ActiveTimeSeriesCounter();
+ }
+
+ public static ActiveTimeSeriesCounter getInstance() {
+ return ActiveTimeSeriesCounterHolder.INSTANCE;
+ }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/adapter/IActiveTimeSeriesCounter.java b/server/src/main/java/org/apache/iotdb/db/conf/adapter/IActiveTimeSeriesCounter.java
new file mode 100644
index 0000000..43bc10c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/conf/adapter/IActiveTimeSeriesCounter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.conf.adapter;
+
+public interface IActiveTimeSeriesCounter {
+
+ /**
+ * Initialize the counter by adding a new HyperLogLog counter for the given storage group
+ *
+ * @param storageGroup the given storage group to be initialized
+ */
+ void init(String storageGroup);
+
+ /**
+ * Register a time series to the active time series counter
+ *
+ * @param storageGroup the storage group name of the time series
+ * @param device the device name of the time series
+ * @param measurement the sensor name of the time series
+ */
+ void offer(String storageGroup, String device, String measurement);
+
+ /**
+ * Update the ActiveRatioMap
+ *
+ * @param storageGroup whose counter will be refreshed after the update
+ */
+ void updateActiveRatio(String storageGroup);
+
+ /**
+ * Get the active time series number proportion of the given storage group
+ *
+ * @param storageGroup the storage group to be calculated
+ * @return the active time series number proportion of the given storage group
+ */
+ double getActiveRatio(String storageGroup);
+
+ /**
+ * Delete the counter for the given storage group
+ *
+ * @param storageGroup whose counter will be removed
+ */
+ void delete(String storageGroup);
+
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 66d8108..9918470 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -18,6 +18,7 @@ import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
@@ -78,9 +79,12 @@ public class MemTableFlushTask {
TVList tvList = series.getSortedTVList();
sortTime += System.currentTimeMillis() - startTime;
encodingTaskQueue.add(new Pair<>(tvList, desc));
+ // register active time series to the ActiveTimeSeriesCounter
+ ActiveTimeSeriesCounter.getInstance().offer(storageGroup, deviceId, measurementId);
}
encodingTaskQueue.add(new EndChunkGroupIoTask(memTable.getVersion()));
}
+ ActiveTimeSeriesCounter.getInstance().updateActiveRatio(storageGroup);
noMoreEncodingTask = true;
logger.debug(
"Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index bfc4a59..eec8828 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -32,6 +32,7 @@ import java.util.function.Supplier;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.conf.adapter.CompressionRatio;
import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -237,29 +238,15 @@ public class TsFileProcessor {
* <p>In the dynamic parameter adjustment module{@link IoTDBConfigDynamicAdapter}, it calculated
* the average size of each metatable{@link IoTDBConfigDynamicAdapter#tryToAdaptParameters()}.
* However, considering that the number of timeseries between storage groups may vary greatly,
- * it's appropriate to judge whether to flush the memtable according to the average memtable size.
- * We need to adjust it according to the number of timeseries in a specific storage group.
- *
- * Abbreviation of parameters:
- *
- * 1 memtableSize: m
- * 2 maxMemTableNum: Nm
- * 3 SeriesNumber: Ns
- * 4 chunkSizeThreshold: Sc
- * 5 Total timeseries number: Nts {@link IoTDBConfigDynamicAdapter#totalTimeseries}
- * 6 MemTable Number for Each SG: Nmes {@link IoTDBConstant#MEMTABLE_NUM_IN_EACH_STORAGE_GROUP}
- *
- * <p>The equation: Σ(Ns * Sc) * Nmes = m * Nm ==> Σ(Ns) * Sc * Nmes = m * Nm ==> Sc = m * Nm / Nmes / Nts
- * <p>Note: Σ means the sum of storage groups , so Nts = ΣNs
+ * it's inappropriate to judge whether to flush the memtable according to the average memtable
+ * size. We need to adjust it according to the number of timeseries in a specific storage group.
*
*/
private long getMemtableSizeThresholdBasedOnSeriesNum() {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- return IoTDBConfigDynamicAdapter.getInstance().getTotalTimeseries() == 0 ? config
- .getMemtableSizeThreshold() :
- config.getMemtableSizeThreshold() * config.getMaxMemtableNumber()
- / IoTDBConstant.MEMTABLE_NUM_IN_EACH_STORAGE_GROUP / IoTDBConfigDynamicAdapter.getInstance()
- .getTotalTimeseries() * MManager.getInstance().getSeriesNumber(storageGroupName);
+ long memTableSize = (long) (config.getMemtableSizeThreshold() * config.getMaxMemtableNumber()
+ / IoTDBConstant.MEMTABLE_NUM_IN_EACH_STORAGE_GROUP * ActiveTimeSeriesCounter.getInstance().getActiveRatio(storageGroupName));
+ return Math.max(memTableSize, config.getMemtableSizeThreshold());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index cc9922a..d985dd2 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
@@ -596,6 +597,7 @@ public class MManager {
}
mgraph.setStorageGroup(path);
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
+ ActiveTimeSeriesCounter.getInstance().init(path);
seriesNumberInStorageGroups.put(path, 0);
if (writeToLog) {
BufferedWriter writer = getLogWriter();