You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/03/14 02:59:29 UTC

[iotdb] branch vectorMemTable updated: change flush memtable from by column to by row

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/vectorMemTable by this push:
     new d6152e2  change flush memtable from by column to by row
d6152e2 is described below

commit d6152e26f1b73ca3e24f287e3666f5ce2fd32568
Author: HTHou <hh...@outlook.com>
AuthorDate: Sun Mar 14 10:58:07 2021 +0800

    change flush memtable from by column to by row
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  88 +++++----------
 .../db/engine/storagegroup/TsFileProcessor.java    |  13 ++-
 .../iotdb/db/utils/datastructure/VectorTVList.java | 122 ++++++++++++++++++++-
 .../file/metadata/statistics/Statistics.java       |   2 +
 4 files changed, 162 insertions(+), 63 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 7f9c91f..c60ec2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -164,11 +164,6 @@ public class MemTableFlushTask {
         private void writeOneSeries(
             TVList tvPairs, IChunkWriter seriesWriterImpl, TSDataType dataType) {
 
-          if (dataType == TSDataType.VECTOR) {
-            writeOneVectorSeries(tvPairs, seriesWriterImpl);
-            return;
-          }
-
           for (int i = 0; i < tvPairs.size(); i++) {
             long time = tvPairs.getTime(i);
 
@@ -202,10 +197,35 @@ public class MemTableFlushTask {
                 seriesWriterImpl.write(time, tvPairs.getBinary(i), false);
                 break;
               case VECTOR:
-                // TODO:
-                //                for ( : tvPairs.getVector(i)) {
-                //                  seriesWriterImpl.write(time, tvPairs.getVector(i)[], get);
-                //                }
+                VectorTVList vectorTVPairs = (VectorTVList) tvPairs;
+                List<TSDataType> dataTypes = vectorTVPairs.getTsDataTypes();
+                for (int j = 0; j < dataTypes.size(); j++) {
+                  switch (dataTypes.get(j)) {
+                    case BOOLEAN:
+                      seriesWriterImpl.write(time, vectorTVPairs.getBoolean(i, j), false);
+                      break;
+                    case INT32:
+                      seriesWriterImpl.write(time, vectorTVPairs.getInt(i, j), false);
+                      break;
+                    case INT64:
+                      seriesWriterImpl.write(time, vectorTVPairs.getLong(i, j), false);
+                      break;
+                    case FLOAT:
+                      seriesWriterImpl.write(time, vectorTVPairs.getFloat(i, j), false);
+                      break;
+                    case DOUBLE:
+                      seriesWriterImpl.write(time, vectorTVPairs.getDouble(i, j), false);
+                      break;
+                    case TEXT:
+                      seriesWriterImpl.write(time, vectorTVPairs.getBinary(i, j), false);
+                      break;
+                    default:
+                      LOGGER.error(
+                          "Storage group {} does not support data type: {}", storageGroup, dataType);
+                      break;
+                  }
+                }
+                seriesWriterImpl.write(time);
                 break;
               default:
                 LOGGER.error(
@@ -215,56 +235,6 @@ public class MemTableFlushTask {
           }
         }
 
-        private void writeOneVectorSeries(TVList tvPairs, IChunkWriter seriesWriterImpl) {
-          VectorTVList tvList = (VectorTVList) tvPairs;
-          List<TSDataType> dataTypes = tvList.getTsDataTypes();
-          List<List<Object>> values = tvList.getValues();
-          for (int i = 0; i < dataTypes.size(); i++) {
-            List<Object> columnValues = values.get(i);
-            for (int j = 0; j < tvList.size(); j++) {
-              long time = tvList.getTime(j);
-              // skip duplicated data
-              if ((i + 1 < tvList.size() && (time == tvPairs.getTime(i + 1)))) {
-                continue;
-              }
-              int valueIndex = tvList.getValueIndex(j);
-              if (valueIndex >= tvList.size()) {
-                throw new ArrayIndexOutOfBoundsException(valueIndex);
-              }
-              int arrayIndex = valueIndex / ARRAY_SIZE;
-              int elementIndex = valueIndex % ARRAY_SIZE;
-              switch (dataTypes.get(i)) {
-                case TEXT:
-                  seriesWriterImpl.write(
-                      time, ((Binary[]) columnValues.get(arrayIndex))[elementIndex], false);
-                  break;
-                case FLOAT:
-                  seriesWriterImpl.write(
-                      time, ((float[]) columnValues.get(arrayIndex))[elementIndex], false);
-                  break;
-                case INT32:
-                  seriesWriterImpl.write(
-                      time, ((int[]) columnValues.get(arrayIndex))[elementIndex], false);
-                  break;
-                case INT64:
-                  seriesWriterImpl.write(
-                      time, ((long[]) columnValues.get(arrayIndex))[elementIndex], false);
-                  break;
-                case DOUBLE:
-                  seriesWriterImpl.write(
-                      time, ((double[]) columnValues.get(arrayIndex))[elementIndex], false);
-                  break;
-                case BOOLEAN:
-                  seriesWriterImpl.write(
-                      time, ((boolean[]) columnValues.get(arrayIndex))[elementIndex], false);
-                  break;
-                default:
-                  break;
-              }
-            }
-          }
-        }
-
         @SuppressWarnings("squid:S135")
         @Override
         public void run() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1e705ab..6a4d9ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.VectorChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
@@ -290,9 +291,15 @@ public class TsFileProcessor {
       }
       if (workMemTable.checkIfChunkDoesNotExist(deviceId, insertRowPlan.getMeasurements()[i])) {
         // ChunkMetadataIncrement
-        chunkMetadataIncrement +=
-            ChunkMetadata.calculateRamSize(
-                insertRowPlan.getMeasurements()[i], insertRowPlan.getDataTypes()[i]);
+        if (insertRowPlan.getDataTypes()[i] == TSDataType.VECTOR) {
+          // TODO: insertRowPlan
+          // chunkMetadataIncrement += VectorChunkMetadata.calculateRamSize(insertRowPlan....);
+        }
+        else {
+          chunkMetadataIncrement +=
+              ChunkMetadata.calculateRamSize(
+                  insertRowPlan.getMeasurements()[i], insertRowPlan.getDataTypes()[i]);
+        }
         memTableIncrement += TVList.tvListArrayMemSize(insertRowPlan.getDataTypes()[i]);
       } else {
         // here currentChunkPointNum >= 1
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
index d54da4f..44b7952 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
@@ -150,6 +150,126 @@ public class VectorTVList extends TVList {
     return TsPrimitiveType.getByType(TSDataType.VECTOR, vector);
   }
 
+  public int getInt(int index, int column) {
+    if (index >= size) {
+      throw new ArrayIndexOutOfBoundsException(index);
+    }
+    int arrayIndex = index / ARRAY_SIZE;
+    int elementIndex = index % ARRAY_SIZE;
+    int valueIndex = indices.get(arrayIndex)[elementIndex];
+    return getIntByValueIndex(valueIndex, column);
+  }
+
+  private int getIntByValueIndex(int valueIndex, int column) {
+    if (valueIndex >= size) {
+      throw new ArrayIndexOutOfBoundsException(valueIndex);
+    }
+    int arrayIndex = valueIndex / ARRAY_SIZE;
+    int elementIndex = valueIndex % ARRAY_SIZE;
+    List<Object> columnValues = values.get(column);
+    return ((int[]) columnValues.get(arrayIndex))[elementIndex];
+  }
+
+  public long getLong(int index, int column) {
+    if (index >= size) {
+      throw new ArrayIndexOutOfBoundsException(index);
+    }
+    int arrayIndex = index / ARRAY_SIZE;
+    int elementIndex = index % ARRAY_SIZE;
+    int valueIndex = indices.get(arrayIndex)[elementIndex];
+    return getLongByValueIndex(valueIndex, column);
+  }
+
+  private long getLongByValueIndex(int valueIndex, int column) {
+    if (valueIndex >= size) {
+      throw new ArrayIndexOutOfBoundsException(valueIndex);
+    }
+    int arrayIndex = valueIndex / ARRAY_SIZE;
+    int elementIndex = valueIndex % ARRAY_SIZE;
+    List<Object> columnValues = values.get(column);
+    return ((long[]) columnValues.get(arrayIndex))[elementIndex];
+  }
+
+  public float getFloat(int index, int column) {
+    if (index >= size) {
+      throw new ArrayIndexOutOfBoundsException(index);
+    }
+    int arrayIndex = index / ARRAY_SIZE;
+    int elementIndex = index % ARRAY_SIZE;
+    int valueIndex = indices.get(arrayIndex)[elementIndex];
+    return getFloatByValueIndex(valueIndex, column);
+  }
+
+  private float getFloatByValueIndex(int valueIndex, int column) {
+    if (valueIndex >= size) {
+      throw new ArrayIndexOutOfBoundsException(valueIndex);
+    }
+    int arrayIndex = valueIndex / ARRAY_SIZE;
+    int elementIndex = valueIndex % ARRAY_SIZE;
+    List<Object> columnValues = values.get(column);
+    return ((float[]) columnValues.get(arrayIndex))[elementIndex];
+  }
+
+  public double getDouble(int index, int column) {
+    if (index >= size) {
+      throw new ArrayIndexOutOfBoundsException(index);
+    }
+    int arrayIndex = index / ARRAY_SIZE;
+    int elementIndex = index % ARRAY_SIZE;
+    int valueIndex = indices.get(arrayIndex)[elementIndex];
+    return getDoubleByValueIndex(valueIndex, column);
+  }
+
+  private double getDoubleByValueIndex(int valueIndex, int column) {
+    if (valueIndex >= size) {
+      throw new ArrayIndexOutOfBoundsException(valueIndex);
+    }
+    int arrayIndex = valueIndex / ARRAY_SIZE;
+    int elementIndex = valueIndex % ARRAY_SIZE;
+    List<Object> columnValues = values.get(column);
+    return ((Double[]) columnValues.get(arrayIndex))[elementIndex];
+  }
+
+  public Binary getBinary(int index, int column) {
+    if (index >= size) {
+      throw new ArrayIndexOutOfBoundsException(index);
+    }
+    int arrayIndex = index / ARRAY_SIZE;
+    int elementIndex = index % ARRAY_SIZE;
+    int valueIndex = indices.get(arrayIndex)[elementIndex];
+    return getBinaryByValueIndex(valueIndex, column);
+  }
+
+  private Binary getBinaryByValueIndex(int valueIndex, int column) {
+    if (valueIndex >= size) {
+      throw new ArrayIndexOutOfBoundsException(valueIndex);
+    }
+    int arrayIndex = valueIndex / ARRAY_SIZE;
+    int elementIndex = valueIndex % ARRAY_SIZE;
+    List<Object> columnValues = values.get(column);
+    return ((Binary[]) columnValues.get(arrayIndex))[elementIndex];
+  }
+
+  public boolean getBoolean(int index, int column) {
+    if (index >= size) {
+      throw new ArrayIndexOutOfBoundsException(index);
+    }
+    int arrayIndex = index / ARRAY_SIZE;
+    int elementIndex = index % ARRAY_SIZE;
+    int valueIndex = indices.get(arrayIndex)[elementIndex];
+    return getBooleanByValueIndex(valueIndex, column);
+  }
+
+  private boolean getBooleanByValueIndex(int valueIndex, int column) {
+    if (valueIndex >= size) {
+      throw new ArrayIndexOutOfBoundsException(valueIndex);
+    }
+    int arrayIndex = valueIndex / ARRAY_SIZE;
+    int elementIndex = valueIndex % ARRAY_SIZE;
+    List<Object> columnValues = values.get(column);
+    return ((boolean[]) columnValues.get(arrayIndex))[elementIndex];
+  }
+
   public List<List<Object>> getValues() {
     return values;
   }
@@ -334,7 +454,7 @@ public class VectorTVList extends TVList {
   @Override
   protected TimeValuePair getTimeValuePair(
       int index, long time, Integer floatPrecision, TSEncoding encoding) {
-    return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.VECTOR, getBinary(index)));
+    return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.VECTOR, getVector(index)));
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 759a751..cd1abe1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -98,6 +98,8 @@ public abstract class Statistics<T> {
         return DoubleStatistics.DOUBLE_STATISTICS_FIXED_RAM_SIZE;
       case FLOAT:
         return FloatStatistics.FLOAT_STATISTICS_FIXED_RAM_SIZE;
+      case VECTOR:
+        return TimeStatistics.TIME_STATISTICS_FIXED_RAM_SIZE;
       default:
         throw new UnknownColumnTypeException(type.toString());
     }