You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/11/01 08:43:17 UTC

[incubator-iotdb] branch dev_new_merge_strategy updated: temp save

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_new_merge_strategy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_new_merge_strategy by this push:
     new d8e3dd8  temp save
d8e3dd8 is described below

commit d8e3dd8674debae10e3ac86a7b92a6cd76024b1c
Author: jt <jt...@163.com>
AuthorDate: Fri Nov 1 16:42:45 2019 +0800

    temp save
---
 .../merge/inplace/task/MergeMultiChunkTask.java    | 10 ++-
 .../db/engine/merge/manage/MergeResource.java      | 10 +++
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  2 +-
 .../java/org/apache/iotdb/db/utils/MergeUtils.java | 22 ++++++-
 .../db/engine/merge/inplace/MergePerfTest.java     |  5 +-
 .../org/apache/iotdb/tsfile/read/common/Chunk.java | 10 ++-
 .../apache/iotdb/tsfile/read/common/DiskChunk.java | 75 ++++++++++++++++++++++
 .../tsfile/read/common}/util/ChunkProvider.java    |  2 +-
 .../read/common}/util/ChunkProviderExecutor.java   |  2 +-
 .../read/common}/util/DirectChunkProvider.java     |  2 +-
 .../read/common}/util/SharedMapChunkProvider.java  |  5 +-
 .../common}/util/SharedQueueChunkProvider.java     |  2 +-
 12 files changed, 126 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeMultiChunkTask.java
index 1421ef3..53e6afb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeMultiChunkTask.java
@@ -27,7 +27,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -39,10 +38,8 @@ import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.merge.inplace.recover.InplaceMergeLogger;
 import org.apache.iotdb.db.engine.merge.IMergePathSelector;
 import org.apache.iotdb.db.engine.merge.NaivePathSelector;
-import org.apache.iotdb.db.engine.merge.util.ChunkProvider;
-import org.apache.iotdb.db.engine.merge.util.DirectChunkProvider;
-import org.apache.iotdb.db.engine.merge.util.SharedMapChunkProvider;
-import org.apache.iotdb.db.engine.merge.util.SharedQueueChunkProvider;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProvider;
+import org.apache.iotdb.tsfile.read.common.util.SharedMapChunkProvider;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.reader.IPointReader;
@@ -241,7 +238,8 @@ class MergeMultiChunkTask {
       idx++;
       ptWrittens[i] = 0;
     }
-    ChunkProvider provider = new DirectChunkProvider(reader);
+//    ChunkProvider provider = new DirectChunkProvider(reader);
+    ChunkProvider provider = new SharedMapChunkProvider(seqChunkMeta, reader);
 
     mergedChunkNum.set(0);
     unmergedChunkNum.set(0);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index d0cbbd4..fca1f7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -151,6 +151,16 @@ public class MergeResource {
     return ret;
   }
 
+  public IPointReader[] getUnseqReadersV2(List<Path> paths) throws IOException {
+    List<Chunk>[] pathChunks = MergeUtils.collectUnseqChunksV2(paths, unseqFiles, this);
+    IPointReader[] ret = new IPointReader[paths.size()];
+    for (int i = 0; i < paths.size(); i++) {
+      TSDataType dataType = getSchema(paths.get(i).getMeasurement()).getType();
+      ret[i] = new CachedUnseqResourceMergeReader(pathChunks[i], dataType);
+    }
+    return ret;
+  }
+
   /**
    * Construct the a new or get an existing ChunkWriter of a measurement. Different timeseries of
    * the same measurement shares the same instance.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index b874849..d99bc69 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.flush.FlushManager;
-import org.apache.iotdb.db.engine.merge.util.ChunkProviderExecutor;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProviderExecutor;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.StatMonitor;
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index ef9a49b..9e01aa6 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -196,6 +196,26 @@ public class MergeUtils {
     return ret;
   }
 
+  public static List<Chunk>[] collectUnseqChunksV2(List<Path> paths,
+      List<TsFileResource> unseqResources, MergeResource mergeResource) throws IOException {
+    List<Chunk>[] ret = new List[paths.size()];
+    for (int i = 0; i < paths.size(); i++) {
+      ret[i] = new ArrayList<>();
+    }
+    PriorityQueue<MetaListEntry> chunkMetaHeap = new PriorityQueue<>();
+
+    for (TsFileResource tsFileResource : unseqResources) {
+
+      TsFileSequenceReader tsFileReader = mergeResource.getFileReader(tsFileResource);
+      // prepare metaDataList
+      buildMetaHeap(paths, tsFileReader, mergeResource, tsFileResource, chunkMetaHeap);
+
+      // read chunks order by their position
+      collectUnseqChunks(chunkMetaHeap, tsFileReader, ret);
+    }
+    return ret;
+  }
+
   private static void buildMetaHeap(List<Path> paths, TsFileSequenceReader tsFileReader,
       MergeResource resource, TsFileResource tsFileResource, PriorityQueue<MetaListEntry> chunkMetaHeap)
       throws IOException {
@@ -217,9 +237,9 @@ public class MergeUtils {
       }
     }
   }
-
   private static void collectUnseqChunks(PriorityQueue<MetaListEntry> chunkMetaHeap,
       TsFileSequenceReader tsFileReader, List<Chunk>[] ret) throws IOException {
+
     while (!chunkMetaHeap.isEmpty()) {
       MetaListEntry metaListEntry = chunkMetaHeap.poll();
       ChunkMetaData currMeta = metaListEntry.current();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergePerfTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergePerfTest.java
index d51a665..32f063d 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergePerfTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergePerfTest.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.merge.MergeTest;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
-import org.apache.iotdb.db.engine.merge.util.ChunkProviderExecutor;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProviderExecutor;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 
@@ -41,6 +41,7 @@ public class MergePerfTest extends MergeTest {
     tempSGDir.mkdirs();
     setUp();
     System.out.println("Files prepared.");
+    Thread.sleep(3000);
 
     long timeConsumption = System.currentTimeMillis();
     MergeResource resource = new MergeResource(seqResources, unseqResources);
@@ -64,7 +65,7 @@ public class MergePerfTest extends MergeTest {
 
     perfTest.seqFileNum = 5;
     perfTest.unseqFileNum = 5;
-    perfTest.measurementNum = 10000;
+    perfTest.measurementNum = 2000;
     perfTest.deviceNum = 1;
     perfTest.ptNum = 5000;
     perfTest.flushInterval = 1000;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
index 11832b6..79d940d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
@@ -26,12 +26,16 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
  */
 public class Chunk {
 
-  private ChunkHeader chunkHeader;
-  private ByteBuffer chunkData;
+  ChunkHeader chunkHeader;
+  ByteBuffer chunkData;
   /**
    * All data with timestamp <= deletedAt are considered deleted.
    */
-  private long deletedAt;
+  long deletedAt;
+
+  Chunk() {
+
+  }
 
   public Chunk(ChunkHeader header, ByteBuffer buffer, long deletedAt) {
     this.chunkHeader = header;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DiskChunk.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DiskChunk.java
new file mode 100644
index 0000000..4fba0df
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DiskChunk.java
@@ -0,0 +1,75 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iotdb.tsfile.read.common;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DiskChunk extends Chunk {
+
+  private static final Logger logger = LoggerFactory.getLogger(DiskChunk.class);
+
+  private boolean onDisk = true;
+  private ChunkMetaData metaData;
+  private ChunkProvider provider;
+
+  public DiskChunk(ChunkMetaData metaData,
+      ChunkProvider provider) {
+    this.metaData = metaData;
+    this.provider = provider;
+  }
+
+  private void load() {
+    if (onDisk) {
+      Chunk chunk;
+      try {
+        chunk = provider.require(metaData);
+        this.chunkHeader = chunk.chunkHeader;
+        this.chunkData = chunk.chunkData;
+        onDisk = false;
+      } catch (InterruptedException | IOException e) {
+        logger.error("Cannot load a chunk, metadata: {}", metaData, e);
+      }
+    }
+  }
+
+  @Override
+  public ChunkHeader getHeader() {
+
+    return super.getHeader();
+  }
+
+  @Override
+  public ByteBuffer getData() {
+    return super.getData();
+  }
+
+  @Override
+  public long getDeletedAt() {
+    return super.getDeletedAt();
+  }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/ChunkProvider.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProvider.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/util/ChunkProvider.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProvider.java
index 4f6c7ba..37555ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/ChunkProvider.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProvider.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.engine.merge.util;
+package org.apache.iotdb.tsfile.read.common.util;
 
 import java.io.IOException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/ChunkProviderExecutor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProviderExecutor.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/util/ChunkProviderExecutor.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProviderExecutor.java
index b8b9987..295e479 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/ChunkProviderExecutor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProviderExecutor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.engine.merge.util;
+package org.apache.iotdb.tsfile.read.common.util;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/DirectChunkProvider.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/DirectChunkProvider.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/util/DirectChunkProvider.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/DirectChunkProvider.java
index eb1c991..36be6ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/DirectChunkProvider.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/DirectChunkProvider.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.engine.merge.util;
+package org.apache.iotdb.tsfile.read.common.util;
 
 import java.io.IOException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/SharedMapChunkProvider.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/SharedMapChunkProvider.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/util/SharedMapChunkProvider.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/SharedMapChunkProvider.java
index b21ffd2..dd9f9bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/SharedMapChunkProvider.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/SharedMapChunkProvider.java
@@ -19,13 +19,10 @@
  *
  */
 
-package org.apache.iotdb.db.engine.merge.util;
+package org.apache.iotdb.tsfile.read.common.util;
 
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.Deque;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/SharedQueueChunkProvider.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/SharedQueueChunkProvider.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/util/SharedQueueChunkProvider.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/SharedQueueChunkProvider.java
index 18ac34d..bd9e8e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/SharedQueueChunkProvider.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/SharedQueueChunkProvider.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.engine.merge.util;
+package org.apache.iotdb.tsfile.read.common.util;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;