You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by li...@apache.org on 2019/11/13 06:46:19 UTC

[incubator-iotdb] 01/01: feat(ActiveTimeSeriesCounter): add ActiveTimeSeriesCounter for memtableSize estimation

This is an automated email from the ASF dual-hosted git repository.

liurui pushed a commit to branch dynamic_config
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 73c32611e4676b33f579b321a4f79c317c6cf104
Author: liuruiyiyang <24...@qq.com>
AuthorDate: Wed Nov 13 14:45:45 2019 +0800

    feat(ActiveTimeSeriesCounter): add ActiveTimeSeriesCounter for memtableSize estimation
---
 server/pom.xml                                     |  5 ++
 .../db/conf/adapter/ActiveTimeSeriesCounter.java   | 87 ++++++++++++++++++++++
 .../db/conf/adapter/IActiveTimeSeriesCounter.java  | 61 +++++++++++++++
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  4 +
 .../db/engine/storagegroup/TsFileProcessor.java    | 25 ++-----
 .../org/apache/iotdb/db/metadata/MManager.java     |  2 +
 6 files changed, 165 insertions(+), 19 deletions(-)

diff --git a/server/pom.xml b/server/pom.xml
index 3dff2e7..643e67f 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -116,6 +116,11 @@
             <artifactId>jfreechart</artifactId>
             <version>1.5.0</version>
         </dependency>
+        <dependency>
+            <groupId>com.clearspring.analytics</groupId>
+            <artifactId>stream</artifactId>
+            <version>2.9.5</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java b/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java
new file mode 100644
index 0000000..87f35a1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.conf.adapter;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ActiveTimeSeriesCounter implements IActiveTimeSeriesCounter {
+
+  /**
+   * Map[StorageGroup, HyperLogLogCounter]
+   */
+  private static Map<String, HyperLogLog> storageGroupHllMap = new ConcurrentHashMap<>();
+
+  /**
+   * Map[StorageGroup, ActiveTimeSeriesRatio]
+   */
+  private static Map<String, Double> activeRatioMap = new ConcurrentHashMap<>();
+
+  /**
+   * LOG2M decide the precision of the HyperLogLog algorithm
+   */
+  private static final int LOG2M = 13;
+
+  @Override
+  public void init(String storageGroup) {
+    storageGroupHllMap.put(storageGroup, new HyperLogLog(LOG2M));
+    activeRatioMap.put(storageGroup, 0D);
+  }
+
+  @Override
+  public void offer(String storageGroup, String device, String measurement) {
+    storageGroupHllMap.get(storageGroup).offer(device + measurement);
+  }
+
+  @Override
+  public void updateActiveRatio(String storageGroup) {
+    double totalActiveTsNum = 0;
+    for (Map.Entry<String, HyperLogLog> entry : storageGroupHllMap.entrySet()) {
+      totalActiveTsNum += entry.getValue().cardinality();
+    }
+    for (Map.Entry<String, HyperLogLog> entry : storageGroupHllMap.entrySet()) {
+      double activeRatio = 0;
+      if (totalActiveTsNum > 0) {
+        activeRatio = entry.getValue().cardinality() / totalActiveTsNum;
+      }
+      activeRatioMap.put(entry.getKey(), activeRatio);
+    }
+    storageGroupHllMap.put(storageGroup, new HyperLogLog(LOG2M));
+  }
+
+  @Override
+  public double getActiveRatio(String storageGroup) {
+    return activeRatioMap.get(storageGroup);
+  }
+
+  @Override
+  public void delete(String storageGroup) {
+    storageGroupHllMap.remove(storageGroup);
+    activeRatioMap.remove(storageGroup);
+  }
+
+  private static class ActiveTimeSeriesCounterHolder {
+    private static final ActiveTimeSeriesCounter INSTANCE = new ActiveTimeSeriesCounter();
+  }
+
+  public static ActiveTimeSeriesCounter getInstance() {
+    return ActiveTimeSeriesCounterHolder.INSTANCE;
+  }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/adapter/IActiveTimeSeriesCounter.java b/server/src/main/java/org/apache/iotdb/db/conf/adapter/IActiveTimeSeriesCounter.java
new file mode 100644
index 0000000..43bc10c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/conf/adapter/IActiveTimeSeriesCounter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.conf.adapter;
+
+public interface IActiveTimeSeriesCounter {
+
+  /**
+   * Initialize the counter by adding a new HyperLogLog counter for the given storage group
+   *
+   * @param storageGroup the given storage group to be initialized
+   */
+  void init(String storageGroup);
+
+  /**
+   * Register a time series to the active time series counter
+   *
+   * @param storageGroup the storage group name of the time series
+   * @param device the device name of the time series
+   * @param measurement the sensor name of the time series
+   */
+  void offer(String storageGroup, String device, String measurement);
+
+  /**
+   * Update the ActiveRatioMap
+   *
+   * @param storageGroup whose counter will be refreshed after the update
+   */
+  void updateActiveRatio(String storageGroup);
+
+  /**
+   * Get the active time series number proportion of the given storage group
+   *
+   * @param storageGroup the storage group to be calculated
+   * @return the active time series number proportion of the given storage group
+   */
+  double getActiveRatio(String storageGroup);
+
+  /**
+   * Delete the counter for the given storage group
+   *
+   * @param storageGroup whose counter will be removed
+   */
+  void delete(String storageGroup);
+
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 66d8108..9918470 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -18,6 +18,7 @@ import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
@@ -78,9 +79,12 @@ public class MemTableFlushTask {
         TVList tvList = series.getSortedTVList();
         sortTime += System.currentTimeMillis() - startTime;
         encodingTaskQueue.add(new Pair<>(tvList, desc));
+        // register active time series to the ActiveTimeSeriesCounter
+        ActiveTimeSeriesCounter.getInstance().offer(storageGroup, deviceId, measurementId);
       }
       encodingTaskQueue.add(new EndChunkGroupIoTask(memTable.getVersion()));
     }
+    ActiveTimeSeriesCounter.getInstance().updateActiveRatio(storageGroup);
     noMoreEncodingTask = true;
     logger.debug(
         "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index bfc4a59..eec8828 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -32,6 +32,7 @@ import java.util.function.Supplier;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.conf.adapter.CompressionRatio;
 import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
 import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -237,29 +238,15 @@ public class TsFileProcessor {
    * <p>In the dynamic parameter adjustment module{@link IoTDBConfigDynamicAdapter}, it calculated
    * the average size of each metatable{@link IoTDBConfigDynamicAdapter#tryToAdaptParameters()}.
    * However, considering that the number of timeseries between storage groups may vary greatly,
-   * it's appropriate to judge whether to flush the memtable according to the average memtable size.
-   * We need to adjust it according to the number of timeseries in a specific storage group.
-   *
-   * Abbreviation of parameters:
-   *
-   * 1 memtableSize: m
-   * 2 maxMemTableNum: Nm
-   * 3 SeriesNumber: Ns
-   * 4 chunkSizeThreshold: Sc
-   * 5 Total timeseries number: Nts  {@link IoTDBConfigDynamicAdapter#totalTimeseries}
-   * 6 MemTable Number for Each SG: Nmes  {@link IoTDBConstant#MEMTABLE_NUM_IN_EACH_STORAGE_GROUP}
-   *
-   * <p>The equation: Σ(Ns * Sc) * Nmes = m * Nm  ==> Σ(Ns) * Sc * Nmes = m * Nm ==> Sc = m * Nm / Nmes / Nts
-   * <p>Note: Σ means the sum of storage groups , so Nts = ΣNs
+   * it's inappropriate to judge whether to flush the memtable according to the average memtable
+   * size. We need to adjust it according to the number of timeseries in a specific storage group.
    *
    */
   private long getMemtableSizeThresholdBasedOnSeriesNum() {
     IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    return IoTDBConfigDynamicAdapter.getInstance().getTotalTimeseries() == 0 ? config
-        .getMemtableSizeThreshold() :
-        config.getMemtableSizeThreshold() * config.getMaxMemtableNumber()
-            / IoTDBConstant.MEMTABLE_NUM_IN_EACH_STORAGE_GROUP / IoTDBConfigDynamicAdapter.getInstance()
-            .getTotalTimeseries() * MManager.getInstance().getSeriesNumber(storageGroupName);
+    long memTableSize = (long) (config.getMemtableSizeThreshold() * config.getMaxMemtableNumber()
+        / IoTDBConstant.MEMTABLE_NUM_IN_EACH_STORAGE_GROUP * ActiveTimeSeriesCounter.getInstance().getActiveRatio(storageGroupName));
+    return Math.max(memTableSize, config.getMemtableSizeThreshold());
   }
 
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index cc9922a..d985dd2 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.exception.ConfigAdjusterException;
@@ -596,6 +597,7 @@ public class MManager {
       }
       mgraph.setStorageGroup(path);
       IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
+      ActiveTimeSeriesCounter.getInstance().init(path);
       seriesNumberInStorageGroups.put(path, 0);
       if (writeToLog) {
         BufferedWriter writer = getLogWriter();