You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/25 14:49:22 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: add
DeduplicatedSortedData
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new fb4c5a1 add DeduplicatedSortedData
fb4c5a1 is described below
commit fb4c5a126ca417d99e5e5e7be4cb1edff243fa19
Author: lta <li...@163.com>
AuthorDate: Tue Jun 25 22:49:07 2019 +0800
add DeduplicatedSortedData
---
.../db/engine/memtable/DeduplicatedSortedData.java | 65 ++++++++++++++++++++++
.../db/engine/memtable/IWritableMemChunk.java | 2 +
.../db/engine/memtable/MemTableFlushTaskV2.java | 9 +--
.../iotdb/db/engine/memtable/WritableMemChunk.java | 14 +++++
4 files changed, 86 insertions(+), 4 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/DeduplicatedSortedData.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/DeduplicatedSortedData.java
new file mode 100644
index 0000000..5f67970
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/DeduplicatedSortedData.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.memtable;
+
+import java.util.List;
+import org.apache.iotdb.db.utils.TimeValuePair;
+
+public class DeduplicatedSortedData {
+ private List<TimeValuePair> timeValuePairs;
+
+ private int index;
+
+ private int length;
+
+ private TimeValuePair cachedTimeValuePair;
+
+ private boolean hasCached;
+
+ public DeduplicatedSortedData(List<TimeValuePair> timeValuePairs) {
+ this.timeValuePairs = timeValuePairs;
+ this.timeValuePairs.sort(TimeValuePair::compareTo);
+ this.index = 0;
+ this.length = timeValuePairs.size();
+ }
+
+ public boolean hasNext(){
+ if(!hasCached) {
+ cachedTimeValuePair = null;
+ while (index < length) {
+ if (cachedTimeValuePair == null || cachedTimeValuePair.getTimestamp() == timeValuePairs
+ .get(index).getTimestamp()) {
+ cachedTimeValuePair = timeValuePairs.get(index++);
+ hasCached = true;
+ } else {
+ break;
+ }
+ }
+ }
+ return hasCached;
+ }
+
+ public TimeValuePair next(){
+ if(!hasCached){
+ hasNext();
+ }
+ hasCached = false;
+ return cachedTimeValuePair;
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 22a8aff..1093f61 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -48,4 +48,6 @@ public interface IWritableMemChunk extends TimeValuePairSorter {
void setTimeOffset(long offset);
void releasePrimitiveArrayList();
+
+ DeduplicatedSortedData getDeduplicatedSortedData();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index 3a7a994..5784f6b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -81,7 +81,7 @@ public class MemTableFlushTaskV2 {
// TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
- List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
+ DeduplicatedSortedData sortedTimeValuePairs = series.getDeduplicatedSortedData();
sortTime += System.currentTimeMillis() - startTime;
memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
}
@@ -137,7 +137,7 @@ public class MemTableFlushTaskV2 {
ioTaskQueue.add(task);
} else {
long starTime = System.currentTimeMillis();
- Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
+ Pair<DeduplicatedSortedData, MeasurementSchema> memorySerializeTask = (Pair<DeduplicatedSortedData, MeasurementSchema>) task;
ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
PAGE_SIZE_THRESHOLD);
@@ -226,10 +226,11 @@ public class MemTableFlushTaskV2 {
};
- private void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
+ private void writeOneSeries(DeduplicatedSortedData tvPairs, IChunkWriter seriesWriterImpl,
TSDataType dataType)
throws IOException {
- for (TimeValuePair timeValuePair : tvPairs) {
+ while (tvPairs.hasNext()) {
+ TimeValuePair timeValuePair = tvPairs.next();
switch (dataType) {
case BOOLEAN:
seriesWriterImpl
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index d0f927d..8890493 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -67,6 +67,7 @@ public class WritableMemChunk implements IWritableMemChunk {
}
}
+ @Override
public void write(long insertTime, Object value) {
switch (dataType) {
case BOOLEAN:
@@ -140,6 +141,18 @@ public class WritableMemChunk implements IWritableMemChunk {
}
@Override
+ public DeduplicatedSortedData getDeduplicatedSortedData() {
+ int length = list.getTotalDataNumber();
+ List<TimeValuePair> data = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ if (list.getTimestamp(i) >= timeOffset) {
+ data.add(new TimeValuePairInMemTable(list.getTimestamp(i), TsPrimitiveType.getByType(dataType, list.getValue(i))));
+ }
+ }
+ return new DeduplicatedSortedData(data);
+ }
+
+ @Override
public void reset() {
this.list = PrimitiveDataListPool.getInstance().getPrimitiveDataListByDataType(dataType);
}
@@ -164,4 +177,5 @@ public class WritableMemChunk implements IWritableMemChunk {
PrimitiveDataListPool.getInstance().release(list);
}
+
}