You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/25 14:49:22 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: add DeduplicatedSortedData

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new fb4c5a1  add DeduplicatedSortedData
fb4c5a1 is described below

commit fb4c5a126ca417d99e5e5e7be4cb1edff243fa19
Author: lta <li...@163.com>
AuthorDate: Tue Jun 25 22:49:07 2019 +0800

    add DeduplicatedSortedData
---
 .../db/engine/memtable/DeduplicatedSortedData.java | 65 ++++++++++++++++++++++
 .../db/engine/memtable/IWritableMemChunk.java      |  2 +
 .../db/engine/memtable/MemTableFlushTaskV2.java    |  9 +--
 .../iotdb/db/engine/memtable/WritableMemChunk.java | 14 +++++
 4 files changed, 86 insertions(+), 4 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/DeduplicatedSortedData.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/DeduplicatedSortedData.java
new file mode 100644
index 0000000..5f67970
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/DeduplicatedSortedData.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.memtable;
+
+import java.util.List;
+import org.apache.iotdb.db.utils.TimeValuePair;
+
+public class DeduplicatedSortedData {
+  private List<TimeValuePair> timeValuePairs;
+
+  private int index;
+
+  private int length;
+
+  private TimeValuePair cachedTimeValuePair;
+
+  private boolean hasCached;
+
+  public DeduplicatedSortedData(List<TimeValuePair> timeValuePairs) {
+    this.timeValuePairs = timeValuePairs;
+    this.timeValuePairs.sort(TimeValuePair::compareTo);
+    this.index = 0;
+    this.length = timeValuePairs.size();
+  }
+
+  public boolean hasNext(){
+    if(!hasCached) {
+      cachedTimeValuePair = null;
+      while (index < length) {
+        if (cachedTimeValuePair == null || cachedTimeValuePair.getTimestamp() == timeValuePairs
+            .get(index).getTimestamp()) {
+          cachedTimeValuePair = timeValuePairs.get(index++);
+          hasCached = true;
+        } else {
+          break;
+        }
+      }
+    }
+    return hasCached;
+  }
+
+  public TimeValuePair next(){
+    if(!hasCached){
+      hasNext();
+    }
+    hasCached = false;
+    return cachedTimeValuePair;
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 22a8aff..1093f61 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -48,4 +48,6 @@ public interface IWritableMemChunk extends TimeValuePairSorter {
   void setTimeOffset(long offset);
 
   void releasePrimitiveArrayList();
+
+  DeduplicatedSortedData getDeduplicatedSortedData();
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index 3a7a994..5784f6b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -81,7 +81,7 @@ public class MemTableFlushTaskV2 {
         // TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
         IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
         MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
-        List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
+        DeduplicatedSortedData sortedTimeValuePairs = series.getDeduplicatedSortedData();
         sortTime += System.currentTimeMillis() - startTime;
         memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
       }
@@ -137,7 +137,7 @@ public class MemTableFlushTaskV2 {
               ioTaskQueue.add(task);
             } else {
               long starTime = System.currentTimeMillis();
-              Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
+              Pair<DeduplicatedSortedData, MeasurementSchema> memorySerializeTask = (Pair<DeduplicatedSortedData, MeasurementSchema>) task;
               ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
               IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
                   PAGE_SIZE_THRESHOLD);
@@ -226,10 +226,11 @@ public class MemTableFlushTaskV2 {
   };
 
 
-  private void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
+  private void writeOneSeries(DeduplicatedSortedData tvPairs, IChunkWriter seriesWriterImpl,
       TSDataType dataType)
       throws IOException {
-    for (TimeValuePair timeValuePair : tvPairs) {
+    while (tvPairs.hasNext()) {
+      TimeValuePair timeValuePair = tvPairs.next();
       switch (dataType) {
         case BOOLEAN:
           seriesWriterImpl
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index d0f927d..8890493 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -67,6 +67,7 @@ public class WritableMemChunk implements IWritableMemChunk {
     }
   }
 
+  @Override
   public void write(long insertTime, Object value) {
     switch (dataType) {
       case BOOLEAN:
@@ -140,6 +141,18 @@ public class WritableMemChunk implements IWritableMemChunk {
   }
 
   @Override
+  public DeduplicatedSortedData getDeduplicatedSortedData() {
+    int length = list.getTotalDataNumber();
+    List<TimeValuePair> data = new ArrayList<>(length);
+    for (int i = 0; i < length; i++) {
+      if (list.getTimestamp(i) >= timeOffset) {
+        data.add(new TimeValuePairInMemTable(list.getTimestamp(i), TsPrimitiveType.getByType(dataType, list.getValue(i))));
+      }
+    }
+    return new DeduplicatedSortedData(data);
+  }
+
+  @Override
   public void reset() {
     this.list = PrimitiveDataListPool.getInstance().getPrimitiveDataListByDataType(dataType);
   }
@@ -164,4 +177,5 @@ public class WritableMemChunk implements IWritableMemChunk {
     PrimitiveDataListPool.getInstance().release(list);
   }
 
+
 }