You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by li...@apache.org on 2019/11/13 06:46:18 UTC

[incubator-iotdb] branch dynamic_config created (now 73c3261)

This is an automated email from the ASF dual-hosted git repository.

liurui pushed a change to branch dynamic_config
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 73c3261  feat(ActiveTimeSeriesCounter): add ActiveTimeSeriesCounter for memtableSize estimation

This branch includes the following new commits:

     new 73c3261  feat(ActiveTimeSeriesCounter): add ActiveTimeSeriesCounter for memtableSize estimation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: feat(ActiveTimeSeriesCounter): add ActiveTimeSeriesCounter for memtableSize estimation

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liurui pushed a commit to branch dynamic_config
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 73c32611e4676b33f579b321a4f79c317c6cf104
Author: liuruiyiyang <24...@qq.com>
AuthorDate: Wed Nov 13 14:45:45 2019 +0800

    feat(ActiveTimeSeriesCounter): add ActiveTimeSeriesCounter for memtableSize estimation
---
 server/pom.xml                                     |  5 ++
 .../db/conf/adapter/ActiveTimeSeriesCounter.java   | 87 ++++++++++++++++++++++
 .../db/conf/adapter/IActiveTimeSeriesCounter.java  | 61 +++++++++++++++
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  4 +
 .../db/engine/storagegroup/TsFileProcessor.java    | 25 ++-----
 .../org/apache/iotdb/db/metadata/MManager.java     |  2 +
 6 files changed, 165 insertions(+), 19 deletions(-)

diff --git a/server/pom.xml b/server/pom.xml
index 3dff2e7..643e67f 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -116,6 +116,11 @@
             <artifactId>jfreechart</artifactId>
             <version>1.5.0</version>
         </dependency>
+        <dependency>
+            <groupId>com.clearspring.analytics</groupId>
+            <artifactId>stream</artifactId>
+            <version>2.9.5</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java b/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java
new file mode 100644
index 0000000..87f35a1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.conf.adapter;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ActiveTimeSeriesCounter implements IActiveTimeSeriesCounter {
+
+  /**
+   * Map[StorageGroup, HyperLogLogCounter]
+   */
+  private static Map<String, HyperLogLog> storageGroupHllMap = new ConcurrentHashMap<>();
+
+  /**
+   * Map[StorageGroup, ActiveTimeSeriesRatio]
+   */
+  private static Map<String, Double> activeRatioMap = new ConcurrentHashMap<>();
+
+  /**
+   * LOG2M decide the precision of the HyperLogLog algorithm
+   */
+  private static final int LOG2M = 13;
+
+  @Override
+  public void init(String storageGroup) {
+    storageGroupHllMap.put(storageGroup, new HyperLogLog(LOG2M));
+    activeRatioMap.put(storageGroup, 0D);
+  }
+
+  @Override
+  public void offer(String storageGroup, String device, String measurement) {
+    storageGroupHllMap.get(storageGroup).offer(device + measurement);
+  }
+
+  @Override
+  public void updateActiveRatio(String storageGroup) {
+    double totalActiveTsNum = 0;
+    for (Map.Entry<String, HyperLogLog> entry : storageGroupHllMap.entrySet()) {
+      totalActiveTsNum += entry.getValue().cardinality();
+    }
+    for (Map.Entry<String, HyperLogLog> entry : storageGroupHllMap.entrySet()) {
+      double activeRatio = 0;
+      if (totalActiveTsNum > 0) {
+        activeRatio = entry.getValue().cardinality() / totalActiveTsNum;
+      }
+      activeRatioMap.put(entry.getKey(), activeRatio);
+    }
+    storageGroupHllMap.put(storageGroup, new HyperLogLog(LOG2M));
+  }
+
+  @Override
+  public double getActiveRatio(String storageGroup) {
+    return activeRatioMap.get(storageGroup);
+  }
+
+  @Override
+  public void delete(String storageGroup) {
+    storageGroupHllMap.remove(storageGroup);
+    activeRatioMap.remove(storageGroup);
+  }
+
+  private static class ActiveTimeSeriesCounterHolder {
+    private static final ActiveTimeSeriesCounter INSTANCE = new ActiveTimeSeriesCounter();
+  }
+
+  public static ActiveTimeSeriesCounter getInstance() {
+    return ActiveTimeSeriesCounterHolder.INSTANCE;
+  }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/adapter/IActiveTimeSeriesCounter.java b/server/src/main/java/org/apache/iotdb/db/conf/adapter/IActiveTimeSeriesCounter.java
new file mode 100644
index 0000000..43bc10c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/conf/adapter/IActiveTimeSeriesCounter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.conf.adapter;
+
+public interface IActiveTimeSeriesCounter {
+
+  /**
+   * Initialize the counter by adding a new HyperLogLog counter for the given storage group
+   *
+   * @param storageGroup the given storage group to be initialized
+   */
+  void init(String storageGroup);
+
+  /**
+   * Register a time series to the active time series counter
+   *
+   * @param storageGroup the storage group name of the time series
+   * @param device the device name of the time series
+   * @param measurement the sensor name of the time series
+   */
+  void offer(String storageGroup, String device, String measurement);
+
+  /**
+   * Update the ActiveRatioMap
+   *
+   * @param storageGroup whose counter will be refreshed after the update
+   */
+  void updateActiveRatio(String storageGroup);
+
+  /**
+   * Get the active time series number proportion of the given storage group
+   *
+   * @param storageGroup the storage group to be calculated
+   * @return the active time series number proportion of the given storage group
+   */
+  double getActiveRatio(String storageGroup);
+
+  /**
+   * Delete the counter for the given storage group
+   *
+   * @param storageGroup whose counter will be removed
+   */
+  void delete(String storageGroup);
+
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 66d8108..9918470 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -18,6 +18,7 @@ import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
@@ -78,9 +79,12 @@ public class MemTableFlushTask {
         TVList tvList = series.getSortedTVList();
         sortTime += System.currentTimeMillis() - startTime;
         encodingTaskQueue.add(new Pair<>(tvList, desc));
+        // register active time series to the ActiveTimeSeriesCounter
+        ActiveTimeSeriesCounter.getInstance().offer(storageGroup, deviceId, measurementId);
       }
       encodingTaskQueue.add(new EndChunkGroupIoTask(memTable.getVersion()));
     }
+    ActiveTimeSeriesCounter.getInstance().updateActiveRatio(storageGroup);
     noMoreEncodingTask = true;
     logger.debug(
         "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index bfc4a59..eec8828 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -32,6 +32,7 @@ import java.util.function.Supplier;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.conf.adapter.CompressionRatio;
 import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
 import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -237,29 +238,15 @@ public class TsFileProcessor {
    * <p>In the dynamic parameter adjustment module{@link IoTDBConfigDynamicAdapter}, it calculated
    * the average size of each metatable{@link IoTDBConfigDynamicAdapter#tryToAdaptParameters()}.
    * However, considering that the number of timeseries between storage groups may vary greatly,
-   * it's appropriate to judge whether to flush the memtable according to the average memtable size.
-   * We need to adjust it according to the number of timeseries in a specific storage group.
-   *
-   * Abbreviation of parameters:
-   *
-   * 1 memtableSize: m
-   * 2 maxMemTableNum: Nm
-   * 3 SeriesNumber: Ns
-   * 4 chunkSizeThreshold: Sc
-   * 5 Total timeseries number: Nts  {@link IoTDBConfigDynamicAdapter#totalTimeseries}
-   * 6 MemTable Number for Each SG: Nmes  {@link IoTDBConstant#MEMTABLE_NUM_IN_EACH_STORAGE_GROUP}
-   *
-   * <p>The equation: Σ(Ns * Sc) * Nmes = m * Nm  ==> Σ(Ns) * Sc * Nmes = m * Nm ==> Sc = m * Nm / Nmes / Nts
-   * <p>Note: Σ means the sum of storage groups , so Nts = ΣNs
+   * it's inappropriate to judge whether to flush the memtable according to the average memtable
+   * size. We need to adjust it according to the number of timeseries in a specific storage group.
    *
    */
   private long getMemtableSizeThresholdBasedOnSeriesNum() {
     IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    return IoTDBConfigDynamicAdapter.getInstance().getTotalTimeseries() == 0 ? config
-        .getMemtableSizeThreshold() :
-        config.getMemtableSizeThreshold() * config.getMaxMemtableNumber()
-            / IoTDBConstant.MEMTABLE_NUM_IN_EACH_STORAGE_GROUP / IoTDBConfigDynamicAdapter.getInstance()
-            .getTotalTimeseries() * MManager.getInstance().getSeriesNumber(storageGroupName);
+    long memTableSize = (long) (config.getMemtableSizeThreshold() * config.getMaxMemtableNumber()
+        / IoTDBConstant.MEMTABLE_NUM_IN_EACH_STORAGE_GROUP * ActiveTimeSeriesCounter.getInstance().getActiveRatio(storageGroupName));
+    return Math.max(memTableSize, config.getMemtableSizeThreshold());
   }
 
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index cc9922a..d985dd2 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.exception.ConfigAdjusterException;
@@ -596,6 +597,7 @@ public class MManager {
       }
       mgraph.setStorageGroup(path);
       IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
+      ActiveTimeSeriesCounter.getInstance().init(path);
       seriesNumberInStorageGroups.put(path, 0);
       if (writeToLog) {
         BufferedWriter writer = getLogWriter();