You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/05/16 06:51:09 UTC

[iotdb] branch aligned_mem_cal created (now 1c34c9321e)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch aligned_mem_cal
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 1c34c9321e update code

This branch includes the following new commits:

     new 06ee6aa46c fix aliged_mem
     new d2c000216d Merge branch 'master' of https://github.com/apache/iotdb into aligned_mem_cal
     new 2acdf54b4f init
     new 88aed54250 measurementIdCache
     new 1c34c9321e update code

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/05: fix aliged_mem

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch aligned_mem_cal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 06ee6aa46cfb9c58a6cccb0ed5dfd2c4d6186603
Author: HTHou <hh...@outlook.com>
AuthorDate: Wed May 10 11:44:34 2023 +0800

    fix aliged_mem
---
 .../db/engine/storagegroup/TsFileProcessor.java    | 48 +++++++++++++---------
 1 file changed, 29 insertions(+), 19 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index f4f33c1047..107107fca9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable;
 import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk;
 import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -453,14 +454,26 @@ public class TsFileProcessor {
     AlignedWritableMemChunk alignedMemChunk = null;
     // get device id
     IDeviceID deviceID = getDeviceID(deviceId);
+    IWritableMemChunkGroup memChunkGroup = workMemTable.getMemTableMap().get(deviceID);
 
-    if (workMemTable.checkIfChunkDoesNotExist(deviceID, AlignedPath.VECTOR_PLACEHOLDER)) {
+    if (memChunkGroup == null) {
       // ChunkMetadataIncrement
       chunkMetadataIncrement +=
           ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR)
               * dataTypes.length;
       memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
+      for (int i = 0; i < dataTypes.length; i++) {
+        // skip failed Measurements
+        if (dataTypes[i] == null || measurements[i] == null) {
+          continue;
+        }
+        // TEXT data mem size
+        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+          textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+        }
+      }
     } else {
+      alignedMemChunk = ((AlignedWritableMemChunkGroup) memChunkGroup).getAlignedMemChunk();
       // here currentChunkPointNum >= 1
       long currentChunkPointNum =
           workMemTable.getCurrentTVListSize(deviceID, AlignedPath.VECTOR_PLACEHOLDER);
@@ -468,24 +481,21 @@ public class TsFileProcessor {
           (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
               ? AlignedTVList.alignedTvListArrayMemCost(dataTypes)
               : 0;
-      alignedMemChunk =
-          ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceID))
-              .getAlignedMemChunk();
-    }
-    for (int i = 0; i < dataTypes.length; i++) {
-      // skip failed Measurements
-      if (dataTypes[i] == null || measurements[i] == null) {
-        continue;
-      }
-      // extending the column of aligned mem chunk
-      if (alignedMemChunk != null && !alignedMemChunk.containsMeasurement(measurements[i])) {
-        memTableIncrement +=
-            (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
-                * dataTypes[i].getDataTypeSize();
-      }
-      // TEXT data mem size
-      if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
-        textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+      for (int i = 0; i < dataTypes.length; i++) {
+        // skip failed Measurements
+        if (dataTypes[i] == null || measurements[i] == null) {
+          continue;
+        }
+        // extending the column of aligned mem chunk
+        //        if (!alignedMemChunk.containsMeasurement(measurements[i])) {
+        //          memTableIncrement +=
+        //              (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
+        //                  * dataTypes[i].getDataTypeSize();
+        //        }
+        // TEXT data mem size
+        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+          textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+        }
       }
     }
     updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);


[iotdb] 04/05: measurementIdCache

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch aligned_mem_cal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 88aed54250d717db3e945ecd475e9f0af0aef614
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon May 15 09:06:20 2023 +0800

    measurementIdCache
---
 .../metadata/cache/DataNodeMeasurementIdCache.java | 57 ++++++++++++++++++++++
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 25 +++++++++-
 .../apache/iotdb/db/wal/utils/WALWriteUtils.java   |  3 +-
 3 files changed, 82 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeMeasurementIdCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeMeasurementIdCache.java
new file mode 100644
index 0000000000..571fbb7a82
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeMeasurementIdCache.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.cache;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+public class DataNodeMeasurementIdCache {
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private final Cache<String, byte[]> measurementIdCache;
+
+  private DataNodeMeasurementIdCache() {
+    measurementIdCache = Caffeine.newBuilder().maximumSize(config.getDevicePathCacheSize()).build();
+  }
+
+  public static DataNodeMeasurementIdCache getInstance() {
+    return DataNodeMeasurementIdCache.DataNodeMeasurementIdCacheHolder.INSTANCE;
+  }
+
+  /** singleton pattern. */
+  private static class DataNodeMeasurementIdCacheHolder {
+    private static final DataNodeMeasurementIdCache INSTANCE = new DataNodeMeasurementIdCache();
+  }
+
+  public boolean contains(String measurementId) {
+    return null != measurementIdCache.getIfPresent(measurementId);
+  }
+
+  public byte[] getBytes(String measurementId) {
+    return measurementIdCache.get(measurementId, String::getBytes);
+  }
+
+  public void cleanUp() {
+    measurementIdCache.cleanUp();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 5d1456935b..5e6f344def 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.metadata.cache.DataNodeMeasurementIdCache;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
@@ -1146,8 +1147,28 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
 
       // check whether measurement is legal according to syntax convention
-      req.setMeasurementsList(
-          PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList()));
+
+      List<List<String>> measurementLists = req.getMeasurementsList();
+      List<List<String>> res = new ArrayList<>();
+      for (List<String> measurements : measurementLists) {
+        List<String> checkedList = new ArrayList<>();
+        for (String measurement : measurements) {
+          if (measurement == null) {
+            checkedList.add(null);
+          } else {
+            if (DataNodeMeasurementIdCache.getInstance().contains(measurement)) {
+              checkedList.add(measurement);
+            } else {
+              String checked = PathUtils.checkAndReturnSingleMeasurement(measurement);
+              DataNodeMeasurementIdCache.getInstance().getBytes(checked);
+              checkedList.add(measurement);
+            }
+          }
+        }
+        res.add(checkedList);
+      }
+
+      req.setMeasurementsList(res);
 
       // Step 1:  transfer from TSInsertRecordsReq to Statement
       InsertRowsStatement statement = StatementGenerator.createStatement(req);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
index 5eb016ced7..ccf23765db 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.wal.utils;
 
+import org.apache.iotdb.db.metadata.cache.DataNodeMeasurementIdCache;
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -124,7 +125,7 @@ public class WALWriteUtils {
       return write(NO_BYTE_TO_READ, buffer);
     }
     int len = 0;
-    byte[] bytes = s.getBytes();
+    byte[] bytes = DataNodeMeasurementIdCache.getInstance().getBytes(s);
     len += write(bytes.length, buffer);
     buffer.put(bytes);
     len += bytes.length;


[iotdb] 05/05: update code

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch aligned_mem_cal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1c34c9321e217928cb4144fc36c4462599c1a1c5
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue May 16 09:43:00 2023 +0800

    update code
---
 .../metadata/cache/DataNodeMeasurementIdCache.java |  6 ++++-
 .../metrics/recorder/WritingMetricsManager.java    | 26 +++++++++++++---------
 .../service/thrift/impl/ClientRPCServiceImpl.java  |  4 ++--
 3 files changed, 23 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeMeasurementIdCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeMeasurementIdCache.java
index 571fbb7a82..16900e7a30 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeMeasurementIdCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeMeasurementIdCache.java
@@ -44,7 +44,11 @@ public class DataNodeMeasurementIdCache {
   }
 
   public boolean contains(String measurementId) {
-    return null != measurementIdCache.getIfPresent(measurementId);
+    return null != measurementIdCache.get(measurementId, k -> null);
+  }
+
+  public void put(String measurementId) {
+    measurementIdCache.put(measurementId, measurementId.getBytes());
   }
 
   public byte[] getBytes(String measurementId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/WritingMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/WritingMetricsManager.java
index 5decb9b3a4..10391227b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/WritingMetricsManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/WritingMetricsManager.java
@@ -26,12 +26,15 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.db.wal.checkpoint.CheckpointType;
+import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
 
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.iotdb.metrics.utils.MetricLevel.DO_NOTHING;
+
 public class WritingMetricsManager {
   public static final WritingMetricsManager INSTANCE = new WritingMetricsManager();
 
@@ -286,16 +289,19 @@ public class WritingMetricsManager {
   }
 
   public void recordSerializeOneWALInfoEntryCost(long costTimeInNanos) {
-    MetricService.getInstance()
-        .timer(
-            costTimeInNanos,
-            TimeUnit.NANOSECONDS,
-            Metric.WAL_COST.toString(),
-            MetricLevel.IMPORTANT,
-            Tag.STAGE.toString(),
-            WritingMetrics.SERIALIZE_WAL_ENTRY,
-            Tag.TYPE.toString(),
-            WritingMetrics.SERIALIZE_ONE_WAL_INFO_ENTRY);
+    if (!DO_NOTHING.equals(
+        MetricConfigDescriptor.getInstance().getMetricConfig().getMetricLevel())) {
+      MetricService.getInstance()
+          .timer(
+              costTimeInNanos,
+              TimeUnit.NANOSECONDS,
+              Metric.WAL_COST.toString(),
+              MetricLevel.IMPORTANT,
+              Tag.STAGE.toString(),
+              WritingMetrics.SERIALIZE_WAL_ENTRY,
+              Tag.TYPE.toString(),
+              WritingMetrics.SERIALIZE_ONE_WAL_INFO_ENTRY);
+    }
   }
 
   public void recordSerializeWALEntryTotalCost(long costTimeInNanos) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 5e6f344def..5cc2161cf8 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -1160,8 +1160,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
               checkedList.add(measurement);
             } else {
               String checked = PathUtils.checkAndReturnSingleMeasurement(measurement);
-              DataNodeMeasurementIdCache.getInstance().getBytes(checked);
-              checkedList.add(measurement);
+              DataNodeMeasurementIdCache.getInstance().put(checked);
+              checkedList.add(checked);
             }
           }
         }


[iotdb] 02/05: Merge branch 'master' of https://github.com/apache/iotdb into aligned_mem_cal

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch aligned_mem_cal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d2c000216d7cf99afacf772abd2859805cbca7f8
Merge: 06ee6aa46c 921faa061b
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu May 11 10:00:48 2023 +0800

    Merge branch 'master' of https://github.com/apache/iotdb into aligned_mem_cal

 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  22 ++
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   4 +
 client-py/tests/test_dataframe.py                  |   2 +
 .../confignode/manager/ClusterSchemaManager.java   |  13 +-
 consensus/pom.xml                                  |   2 +-
 docs/UserGuide/Monitor-Alert/Metric-Tool.md        |   8 +-
 docs/UserGuide/QuickStart/WayToGetIoTDB.md         |   2 +-
 docs/UserGuide/Reference/DataNode-Config-Manual.md |   2 +-
 docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md     |  24 +-
 .../UserGuide/Reference/DataNode-Config-Manual.md  |   2 +-
 .../apache/iotdb/db/it/query/IoTDBResultSetIT.java |   2 +
 .../db/it/schema/IoTDBAutoCreateSchemaIT.java      |  17 +-
 .../it/schema/IoTDBCreateAlignedTimeseriesIT.java  |  17 +-
 .../db/it/schema/IoTDBCreateStorageGroupIT.java    |  18 +-
 .../db/it/schema/IoTDBCreateTimeseriesIT.java      |  17 +-
 .../db/it/schema/IoTDBDeactivateTemplateIT.java    |  26 +-
 .../it/schema/IoTDBDeleteAlignedTimeseriesIT.java  |  18 +-
 .../db/it/schema/IoTDBDeleteStorageGroupIT.java    |  17 +-
 .../db/it/schema/IoTDBDeleteTimeseriesIT.java      |  18 +-
 .../iotdb/db/it/schema/IoTDBExtendTemplateIT.java  |  49 +--
 .../iotdb/db/it/schema/IoTDBMetadataFetchIT.java   |  63 ++--
 .../iotdb/db/it/schema/IoTDBSchemaTemplateIT.java  |  49 +--
 .../db/it/schema/IoTDBSortedShowTimeseriesIT.java  |  20 +-
 .../apache/iotdb/db/it/schema/IoTDBTagAlterIT.java |  17 +-
 .../org/apache/iotdb/db/it/schema/IoTDBTagIT.java  |  17 +-
 .../org/apache/iotdb/util/AbstractSchemaIT.java    |  48 ++-
 .../iotdb/zeppelin/it/IoTDBInterpreterIT.java      |  14 +-
 .../metrics/metricsets/jvm/JvmMemoryMetrics.java   |   6 +-
 .../resources/conf/iotdb-common.properties         |   4 +
 .../iotdb/commons/consensus/DataRegionId.java      |   4 -
 .../org/apache/iotdb/commons/path/PartialPath.java |   4 +-
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |   8 +-
 .../plugin/builtin/collector/DefaultCollector.java |  60 ++++
 .../commons/pipe/task/meta/PipeStaticMeta.java     |   4 +-
 .../node/common/AbstractMeasurementMNode.java      |   4 +
 .../schema/node/role/IMeasurementMNode.java        |   2 +
 .../commons/schema/node/utils/IMNodeFactory.java   |   4 +
 .../api/exception/PipeConnectionException.java     |  18 +-
 .../schemaregion/rocksdb/RSchemaRegion.java        |   7 +
 .../rocksdb/mnode/RMeasurementMNode.java           |   5 +
 .../metadata/tagSchemaRegion/TagSchemaRegion.java  |   7 +
 .../resources/conf/iotdb-datanode.properties       |   2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  11 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   4 +
 .../db/engine/cache/TimeSeriesMetadataCache.java   |   7 +-
 .../CompactionMemoryNotEnoughException.java}       |   9 +-
 .../execute/task/CrossSpaceCompactionTask.java     |   6 +-
 .../estimator/AbstractCompactionEstimator.java     |   3 +-
 .../ReadPointCrossCompactionEstimator.java         |   6 +-
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  35 +-
 .../apache/iotdb/db/metadata/MetadataConstant.java |   6 +
 .../db/metadata/cache/DataNodeSchemaCache.java     |  14 +-
 .../metadata/cache/DataNodeSchemaCacheMetrics.java |  17 +-
 .../db/metadata/cache/TimeSeriesSchemaCache.java   |   3 +-
 .../dualkeycache/impl/DualKeyCacheBuilder.java     |  18 +-
 .../dualkeycache/impl/DualKeyCachePolicy.java      |   3 +-
 .../dualkeycache/impl/FIFOCacheEntryManager.java   | 189 ++++++++++
 .../mnode/config/factory/ConfigMNodeFactory.java   |   7 +
 .../mnode/mem/factory/MemMNodeFactory.java         |  14 +
 ...MeasurementMNode.java => LogicalViewMNode.java} |  30 +-
 .../metadata/mnode/mem/impl/LogicalViewSchema.java | 215 +++++++++++
 .../metadata/mnode/mem/impl/MeasurementMNode.java  |   5 +
 .../metadata/mnode/mem/info/LogicalViewInfo.java   | 161 +++++++++
 .../schemafile/factory/CacheMNodeFactory.java      |   7 +
 .../schemafile/impl/CachedMeasurementMNode.java    |   5 +
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  |  10 +-
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |  75 +++-
 .../mtree/snapshot/MemMTreeSnapshotUtil.java       |  48 ++-
 .../plan/schemaregion/SchemaRegionPlanType.java    |   2 +
 .../plan/schemaregion/SchemaRegionPlanVisitor.java |   5 +
 .../impl/SchemaRegionPlanDeserializer.java         |  24 ++
 .../impl/SchemaRegionPlanSerializer.java           |  27 ++
 .../impl/SchemaRegionPlanTxtSerializer.java        |  21 ++
 .../impl/write/CreateLogicalViewPlanImpl.java      |  75 ++++
 .../schemaregion/result/ShowTimeSeriesResult.java  |  14 +-
 .../schemaregion/write/ICreateLogicalViewPlan.java |  66 ++++
 .../metadata/query/info/ITimeSeriesSchemaInfo.java |   6 +-
 .../metadata/rescon/MemSchemaRegionStatistics.java |   8 +-
 .../db/metadata/schemaregion/ISchemaRegion.java    |   5 +
 .../schemaregion/SchemaRegionMemoryImpl.java       |  40 +++
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |   7 +
 .../metadata/template/ClusterTemplateManager.java  |  36 ++
 .../view/viewExpression/ViewExpression.java        | 318 +++++++++++++++++
 .../view/viewExpression/ViewExpressionType.java    |  76 ++++
 .../binary/BinaryViewExpression.java               | 119 +++++++
 .../binary/arithmetic/AdditionViewExpression.java  |  61 ++++
 .../arithmetic/ArithmeticBinaryViewExpression.java |  52 +++
 .../binary/arithmetic/DivisionViewExpression.java  |  61 ++++
 .../binary/arithmetic/ModuloViewExpression.java    |  61 ++++
 .../arithmetic/MultiplicationViewExpression.java   |  63 ++++
 .../arithmetic/SubtractionViewExpression.java      |  61 ++++
 .../compare/CompareBinaryViewExpression.java       |  52 +++
 .../binary/compare/EqualToViewExpression.java      |  57 +++
 .../binary/compare/GreaterEqualViewExpression.java |  57 +++
 .../binary/compare/GreaterThanViewExpression.java  |  57 +++
 .../binary/compare/LessEqualViewExpression.java    |  57 +++
 .../binary/compare/LessThanViewExpression.java     |  57 +++
 .../binary/compare/NonEqualViewExpression.java     |  57 +++
 .../binary/logic/LogicAndViewExpression.java       |  57 +++
 .../binary/logic/LogicBinaryViewExpression.java    |  52 +++
 .../binary/logic/LogicOrViewExpression.java        |  57 +++
 .../viewExpression/leaf/ConstantViewOperand.java   |  98 ++++++
 .../view/viewExpression/leaf/LeafViewOperand.java} |  27 +-
 .../view/viewExpression/leaf/NullViewOperand.java  |  61 ++++
 .../viewExpression/leaf/TimeSeriesViewOperand.java |  87 +++++
 .../viewExpression/leaf/TimestampViewOperand.java  |  72 ++++
 .../multi/FunctionViewExpression.java              | 198 +++++++++++
 .../ternary/BetweenViewExpression.java             | 111 ++++++
 .../ternary/TernaryViewExpression.java             | 105 ++++++
 .../viewExpression/unary/InViewExpression.java     | 106 ++++++
 .../viewExpression/unary/IsNullViewExpression.java |  89 +++++
 .../viewExpression/unary/LikeViewExpression.java   | 163 +++++++++
 .../unary/LogicNotViewExpression.java              |  61 ++++
 .../unary/NegationViewExpression.java              |  61 ++++
 .../unary/RegularViewExpression.java               | 105 ++++++
 .../viewExpression/unary/UnaryViewExpression.java  |  73 ++++
 .../visitor/GetSourcePathsVisitor.java             |  95 +++++
 .../visitor/TransformToExpressionVisitor.java      | 320 +++++++++++++++++
 .../visitor/ViewExpressionVisitor.java             | 219 ++++++++++++
 .../metadata/visitor/SchemaExecutionVisitor.java   |  22 ++
 .../db/mpp/common/header/ColumnHeaderConstant.java |   6 +-
 .../mpp/execution/exchange/SharedTsBlockQueue.java |  12 +-
 .../execution/executor/RegionWriteExecutor.java    |  49 +++
 .../schema/source/TimeSeriesSchemaSource.java      |  14 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  45 +++
 .../visitor/TransformToViewExpressionVisitor.java  | 391 +++++++++++++++++++++
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  86 +++++
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  21 ++
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   6 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   5 +
 .../node/metedata/write/CreateLogicalViewNode.java | 250 +++++++++++++
 .../plan/planner/plan/node/write/InsertNode.java   |   8 +
 .../iotdb/db/mpp/plan/statement/StatementType.java |   2 +
 .../db/mpp/plan/statement/StatementVisitor.java    |   7 +
 .../metadata/CreateLogicalViewStatement.java       | 246 +++++++++++++
 .../db/pipe/agent/plugin/PipePluginAgent.java      |  26 +-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 139 ++++----
 .../PipeCollectorConstant.java}                    |  20 +-
 .../PipeConnectorConstant.java}                    |  12 +-
 .../PipeProcessorConstant.java}                    |  12 +-
 .../core/collector/IoTDBDataRegionCollector.java   |  85 +++++
 ...> PipeHistoricalDataRegionTsFileCollector.java} |  39 +-
 .../realtime/PipeRealtimeDataRegionCollector.java  |  21 +-
 ... => PipeRealtimeDataRegionHybridCollector.java} |  33 +-
 .../connector/PipeConnectorSubtaskLifeCycle.java   |  31 +-
 .../connector/PipeConnectorSubtaskManager.java     |  19 +-
 .../event/view/collector/PipeEventCollector.java   |  59 +++-
 ...anager.java => PipeSubtaskExecutorManager.java} |  12 +-
 .../execution/scheduler/PipeTaskScheduler.java     |  18 +-
 .../org/apache/iotdb/db/pipe/task/PipeBuilder.java |  66 ++++
 .../org/apache/iotdb/db/pipe/task/PipeTask.java    |   9 +-
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |  60 +++-
 .../apache/iotdb/db/pipe/task/PipeTaskManager.java |  96 +++++
 .../iotdb/db/pipe/task/queue/EventSupplier.java    |  25 +-
 .../ListenableBlockingPendingQueue.java}           |  18 +-
 .../db/pipe/task/queue/ListenablePendingQueue.java | 159 +++++++++
 .../ListenableUnblockingPendingQueue.java}         |  18 +-
 .../PendingQueueEmptyToNotEmptyListener.java}      |   9 +-
 .../PendingQueueFullToNotFullListener.java}        |   9 +-
 .../PendingQueueNotEmptyToEmptyListener.java}      |   9 +-
 .../PendingQueueNotFullToFullListener.java}        |   9 +-
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |  83 +++--
 .../db/pipe/task/stage/PipeTaskConnectorStage.java | 100 +-----
 .../db/pipe/task/stage/PipeTaskProcessorStage.java | 105 +++++-
 .../iotdb/db/pipe/task/stage/PipeTaskStage.java    |  95 ++++-
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |  87 +++--
 .../db/pipe/task/subtask/PipeProcessorSubtask.java |  20 +-
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    |   6 +-
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |  20 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   1 +
 .../cache/dualkeycache/DualKeyCacheTest.java       |  17 +
 .../metadata/view/ViewExpressionToStringTest.java  | 183 ++++++++++
 .../collector/CachedSchemaPatternMatcherTest.java  |  48 ++-
 .../core/collector/PipeRealtimeCollectTest.java    |  59 +++-
 .../executor/PipeConnectorSubtaskExecutorTest.java |   8 +-
 .../executor/PipeProcessorSubtaskExecutorTest.java |  10 +-
 176 files changed, 7710 insertions(+), 689 deletions(-)


[iotdb] 03/05: init

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch aligned_mem_cal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2acdf54b4f348d389a500b017c68612dcce61b21
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu May 11 15:53:54 2023 +0800

    init
---
 .../engine/memtable/AlignedWritableMemChunk.java   |  5 ++--
 .../db/engine/storagegroup/TsFileProcessor.java    | 10 ++++----
 .../db/metadata/cache/TimeSeriesSchemaCache.java   |  2 +-
 .../metadata/cache/dualkeycache/IDualKeyCache.java |  2 ++
 .../cache/dualkeycache/impl/DualKeyCacheImpl.java  | 28 ++++++++++++++++++++++
 5 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index dabd3b472d..d947b6a034 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -41,7 +41,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -60,7 +59,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
   private static final Logger LOGGER = LoggerFactory.getLogger(AlignedWritableMemChunk.class);
 
   public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList) {
-    this.measurementIndexMap = new LinkedHashMap<>();
+    this.measurementIndexMap = new HashMap<>();
     List<TSDataType> dataTypeList = new ArrayList<>();
     this.schemaList = schemaList;
     for (int i = 0; i < schemaList.size(); i++) {
@@ -71,7 +70,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
   }
 
   private AlignedWritableMemChunk(List<IMeasurementSchema> schemaList, AlignedTVList list) {
-    this.measurementIndexMap = new LinkedHashMap<>();
+    this.measurementIndexMap = new HashMap<>();
     this.schemaList = schemaList;
     for (int i = 0; i < schemaList.size(); i++) {
       measurementIndexMap.put(schemaList.get(i).getMeasurementId(), i);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 107107fca9..21634f782d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -487,11 +487,11 @@ public class TsFileProcessor {
           continue;
         }
         // extending the column of aligned mem chunk
-        //        if (!alignedMemChunk.containsMeasurement(measurements[i])) {
-        //          memTableIncrement +=
-        //              (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
-        //                  * dataTypes[i].getDataTypeSize();
-        //        }
+        if (!alignedMemChunk.containsMeasurement(measurements[i])) {
+          memTableIncrement +=
+              (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
+                  * dataTypes[i].getDataTypeSize();
+        }
         // TEXT data mem size
         if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
           textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
index 755c30bd9a..5f1579f398 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
@@ -215,7 +215,7 @@ public class TimeSeriesSchemaCache {
       Long latestFlushedTime) {
     SchemaCacheEntry entry;
     List<Integer> missingMeasurements = new ArrayList<>();
-    dualKeyCache.compute(
+    dualKeyCache.computeLast(
         new IDualKeyCacheComputation<PartialPath, String, SchemaCacheEntry>() {
           @Override
           public PartialPath getFirstKey() {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCache.java
index cb11dd6cb2..236aacd8e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCache.java
@@ -40,6 +40,8 @@ public interface IDualKeyCache<FK, SK, V> {
    */
   void compute(IDualKeyCacheComputation<FK, SK, V> computation);
 
+  void computeLast(IDualKeyCacheComputation<FK, SK, V> computation);
+
   /** put the cache value into cache */
   void put(FK firstKey, SK secondKey, V value);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheImpl.java
index 270f3ec131..de4423af3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheImpl.java
@@ -96,6 +96,34 @@ class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK, V>>
     }
   }
 
+  @Override
+  public void computeLast(IDualKeyCacheComputation<FK, SK, V> computation) {
+    FK firstKey = computation.getFirstKey();
+    ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = firstKeyMap.get(firstKey);
+    SK[] secondKeyList = computation.getSecondKeyList();
+    if (cacheEntryGroup == null) {
+      for (int i = 0; i < secondKeyList.length; i++) {
+        computation.computeValue(i, null);
+      }
+      cacheStats.recordMiss(secondKeyList.length);
+    } else {
+      T cacheEntry;
+      int hitCount = 0;
+      for (int i = 0; i < secondKeyList.length; i++) {
+        cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
+        if (cacheEntry == null) {
+          computation.computeValue(i, null);
+        } else {
+          computation.computeValue(i, cacheEntry.getValue());
+          cacheEntryManager.access(cacheEntry);
+          hitCount++;
+        }
+      }
+      cacheStats.recordHit(hitCount);
+      cacheStats.recordMiss(secondKeyList.length - hitCount);
+    }
+  }
+
   @Override
   public void put(FK firstKey, SK secondKey, V value) {
     int usedMemorySize = putToCache(firstKey, secondKey, value);