You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/11/01 07:51:10 UTC
[iotdb] branch new_vector updated: refacter get readonlymemchunk
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_vector by this push:
new 1bfa4e0 refacter get readonlymemchunk
1bfa4e0 is described below
commit 1bfa4e0bbe49fc6423278fa7df751a4ce918c656
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Nov 1 15:50:30 2021 +0800
refacter get readonlymemchunk
---
.../iotdb/db/engine/memtable/AbstractMemTable.java | 57 ++--------------------
.../db/engine/memtable/IWritableMemChunk.java | 4 +-
.../db/engine/memtable/VectorWritableMemChunk.java | 12 ++++-
.../iotdb/db/engine/memtable/WritableMemChunk.java | 2 +-
.../apache/iotdb/db/metadata/path/AlignedPath.java | 34 ++++++++++++-
.../iotdb/db/metadata/path/MeasurementPath.java | 30 ++++++++++++
.../apache/iotdb/db/metadata/path/PartialPath.java | 17 +++++++
7 files changed, 98 insertions(+), 58 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index a71f62e..dcbc9ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
@@ -89,15 +90,6 @@ public abstract class AbstractMemTable implements IMemTable {
}
/**
- * check whether the given seriesPath is within this memtable.
- *
- * @return true if seriesPath is within this memtable
- */
- private boolean checkPath(String deviceId, String measurement) {
- return memTableMap.containsKey(deviceId) && memTableMap.get(deviceId).containsKey(measurement);
- }
-
- /**
* create this MemChunk if it's not exist
*
* @param deviceId device id
@@ -180,7 +172,7 @@ public abstract class AbstractMemTable implements IMemTable {
}
VectorMeasurementSchema vectorSchema =
new VectorMeasurementSchema(
- null,
+ AlignedPath.VECTOR_PLACEHOLDER,
measurements.toArray(new String[measurements.size()]),
types.toArray(new TSDataType[measurements.size()]),
encodings.toArray(new TSEncoding[measurements.size()]),
@@ -279,7 +271,7 @@ public abstract class AbstractMemTable implements IMemTable {
}
VectorMeasurementSchema vectorSchema =
new VectorMeasurementSchema(
- null,
+ AlignedPath.VECTOR_PLACEHOLDER,
measurements.toArray(new String[measurements.size()]),
types.toArray(new TSDataType[measurements.size()]),
encodings.toArray(new TSEncoding[measurements.size()]),
@@ -366,48 +358,7 @@ public abstract class AbstractMemTable implements IMemTable {
public ReadOnlyMemChunk query(
PartialPath fullPath, long ttlLowerBound, List<TimeRange> deletionList)
throws IOException, QueryProcessException {
- // if (partialVectorSchema.getType() == TSDataType.VECTOR) {
- // if (!memTableMap.containsKey(deviceId)) {
- // return null;
- // }
- // IWritableMemChunk vectorMemChunk =
- // memTableMap.get(deviceId).get(partialVectorSchema.getMeasurementId());
- // if (vectorMemChunk == null) {
- // return null;
- // }
- //
- // List<String> measurementIdList = partialVectorSchema.getSubMeasurementsList();
- // List<Integer> columns = new ArrayList<>();
- // IMeasurementSchema vectorSchema = vectorMemChunk.getSchema();
- // for (String queryingMeasurement : measurementIdList) {
- // columns.add(vectorSchema.getSubMeasurementsList().indexOf(queryingMeasurement));
- // }
- // // get sorted tv list is synchronized so different query can get right sorted list
- // reference
- // TVList vectorTvListCopy = vectorMemChunk.getSortedTvListForQuery(columns);
- // int curSize = vectorTvListCopy.size();
- // return new ReadOnlyMemChunk(partialVectorSchema, vectorTvListCopy, curSize,
- // deletionList);
- // } else {
- // if (!checkPath(deviceId, measurement)) {
- // return null;
- // }
- // IWritableMemChunk memChunk =
- // memTableMap.get(deviceId).get(partialVectorSchema.getMeasurementId());
- // // get sorted tv list is synchronized so different query can get right sorted list
- // reference
- // TVList chunkCopy = memChunk.getSortedTvListForQuery();
- // int curSize = chunkCopy.size();
- // return new ReadOnlyMemChunk(
- // measurement,
- // partialVectorSchema.getType(),
- // partialVectorSchema.getEncodingType(),
- // chunkCopy,
- // partialVectorSchema.getProps(),
- // curSize,
- // deletionList);
- // }
- return null;
+ return fullPath.getReadOnlyMemChunkFromMemTable(memTableMap, deletionList);
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 11e5ddd..1d27b0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -101,10 +101,10 @@ public interface IWritableMemChunk {
*
* <p>the mechanism is just like copy on write
*
- * @param columnIndexList indices of queried columns in the full VectorTVList
+ * @param measurementList the measurementList to be queried
* @return sorted tv list
*/
- TVList getSortedTvListForQuery(List<Integer> columnIndexList);
+ TVList getSortedTvListForQuery(List<String> measurementList);
/**
* served for flush requests. The logic is just same as getSortedTVListForQuery, but without add
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
index c0d44bc..2bcf273 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
@@ -37,6 +37,10 @@ public class VectorWritableMemChunk implements IWritableMemChunk {
this.list = TVListAllocator.getInstance().allocate(schema.getSubMeasurementsTSDataTypeList());
}
+ public boolean containsMeasurement(String measurementId) {
+ return vectorIdIndexMap.containsKey(measurementId);
+ }
+
@Override
public void putLong(long t, long v) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
@@ -166,10 +170,16 @@ public class VectorWritableMemChunk implements IWritableMemChunk {
}
@Override
- public TVList getSortedTvListForQuery(List<Integer> columnIndexList) {
+ public TVList getSortedTvListForQuery(List<String> measurementList) {
sortTVList();
// increase reference count
list.increaseReferenceCount();
+ List<Integer> columnIndexList = new ArrayList<>();
+ for (String measurement : measurementList) {
+ if (vectorIdIndexMap.containsKey(measurement)) {
+ columnIndexList.add(vectorIdIndexMap.get(measurement));
+ }
+ }
return list.getTvListByColumnIndex(columnIndexList);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 4ad3d81..8dcfc07 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -200,7 +200,7 @@ public class WritableMemChunk implements IWritableMemChunk {
}
@Override
- public synchronized TVList getSortedTvListForQuery(List<Integer> columnIndexList) {
+ public synchronized TVList getSortedTvListForQuery(List<String> measurementList) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index 4d3640d..8d4b205 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -19,22 +19,30 @@
package org.apache.iotdb.db.metadata.path;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.VectorWritableMemChunk;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.AlignedSeriesReader;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -47,7 +55,7 @@ import java.util.Set;
public class AlignedPath extends PartialPath {
// todo improve vector implementation by remove this placeholder
- private static final String VECTOR_PLACEHOLDER = "";
+ public static final String VECTOR_PLACEHOLDER = "";
private List<String> measurementList;
private List<IMeasurementSchema> schemaList;
@@ -225,4 +233,28 @@ public class AlignedPath extends PartialPath {
valueFilter,
ascending);
}
+
+ @Override
+ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
+ Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList)
+ throws QueryProcessException, IOException {
+ if (!memTableMap.containsKey(getDevice())) {
+ return null;
+ }
+ VectorWritableMemChunk vectorMemChunk =
+ ((VectorWritableMemChunk) memTableMap.get(getDevice()).get(VECTOR_PLACEHOLDER));
+ List<String> validMeasurementList = new ArrayList<>();
+ for (String measurement : measurementList) {
+ if (vectorMemChunk.containsMeasurement(measurement)) {
+ validMeasurementList.add(measurement);
+ }
+ }
+ if (validMeasurementList.isEmpty()) {
+ return null;
+ }
+ // get sorted tv list is synchronized so different query can get right sorted list reference
+ TVList vectorTvListCopy = vectorMemChunk.getSortedTvListForQuery(validMeasurementList);
+ int curSize = vectorTvListCopy.size();
+ return new ReadOnlyMemChunk(getMeasurementSchema(), vectorTvListCopy, curSize, deletionList);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index b328990..512a625 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -19,18 +19,25 @@
package org.apache.iotdb.db.metadata.path;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.SeriesReader;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class MeasurementPath extends PartialPath {
@@ -140,4 +147,27 @@ public class MeasurementPath extends PartialPath {
valueFilter,
ascending);
}
+
+ @Override
+ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
+ Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList)
+ throws QueryProcessException, IOException {
+ // check If Memtable Contains this path
+ if (memTableMap.containsKey(getDevice())
+ && memTableMap.get(getDevice()).containsKey(getMeasurement())) {
+ return null;
+ }
+ IWritableMemChunk memChunk = memTableMap.get(getDevice()).get(getMeasurement());
+ // get sorted tv list is synchronized so different query can get right sorted list reference
+ TVList chunkCopy = memChunk.getSortedTvListForQuery();
+ int curSize = chunkCopy.size();
+ return new ReadOnlyMemChunk(
+ getMeasurement(),
+ measurementSchema.getType(),
+ measurementSchema.getEncodingType(),
+ chunkCopy,
+ measurementSchema.getProps(),
+ curSize,
+ deletionList);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
index fde7705..648049c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
@@ -18,10 +18,13 @@
*/
package org.apache.iotdb.db.metadata.path;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.filter.TsFileFilter;
@@ -30,16 +33,19 @@ import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
@@ -387,4 +393,15 @@ public class PartialPath extends Path implements Comparable<Path> {
boolean ascending) {
throw new UnsupportedOperationException("Should call exact sub class!");
}
+
+ /**
+ * get the ReadOnlyMemChunk from the given MemTable.
+ *
+ * @return ReadOnlyMemChunk
+ */
+ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
+ Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList)
+ throws QueryProcessException, IOException {
+ throw new UnsupportedOperationException("Should call exact sub class!");
+ }
}