You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/11/01 08:43:17 UTC
[incubator-iotdb] branch dev_new_merge_strategy updated: temp save
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_new_merge_strategy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dev_new_merge_strategy by this push:
new d8e3dd8 temp save
d8e3dd8 is described below
commit d8e3dd8674debae10e3ac86a7b92a6cd76024b1c
Author: jt <jt...@163.com>
AuthorDate: Fri Nov 1 16:42:45 2019 +0800
temp save
---
.../merge/inplace/task/MergeMultiChunkTask.java | 10 ++-
.../db/engine/merge/manage/MergeResource.java | 10 +++
.../java/org/apache/iotdb/db/service/IoTDB.java | 2 +-
.../java/org/apache/iotdb/db/utils/MergeUtils.java | 22 ++++++-
.../db/engine/merge/inplace/MergePerfTest.java | 5 +-
.../org/apache/iotdb/tsfile/read/common/Chunk.java | 10 ++-
.../apache/iotdb/tsfile/read/common/DiskChunk.java | 75 ++++++++++++++++++++++
.../tsfile/read/common}/util/ChunkProvider.java | 2 +-
.../read/common}/util/ChunkProviderExecutor.java | 2 +-
.../read/common}/util/DirectChunkProvider.java | 2 +-
.../read/common}/util/SharedMapChunkProvider.java | 5 +-
.../common}/util/SharedQueueChunkProvider.java | 2 +-
12 files changed, 126 insertions(+), 21 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeMultiChunkTask.java
index 1421ef3..53e6afb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeMultiChunkTask.java
@@ -27,7 +27,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -39,10 +38,8 @@ import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.inplace.recover.InplaceMergeLogger;
import org.apache.iotdb.db.engine.merge.IMergePathSelector;
import org.apache.iotdb.db.engine.merge.NaivePathSelector;
-import org.apache.iotdb.db.engine.merge.util.ChunkProvider;
-import org.apache.iotdb.db.engine.merge.util.DirectChunkProvider;
-import org.apache.iotdb.db.engine.merge.util.SharedMapChunkProvider;
-import org.apache.iotdb.db.engine.merge.util.SharedQueueChunkProvider;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProvider;
+import org.apache.iotdb.tsfile.read.common.util.SharedMapChunkProvider;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.reader.IPointReader;
@@ -241,7 +238,8 @@ class MergeMultiChunkTask {
idx++;
ptWrittens[i] = 0;
}
- ChunkProvider provider = new DirectChunkProvider(reader);
+// ChunkProvider provider = new DirectChunkProvider(reader);
+ ChunkProvider provider = new SharedMapChunkProvider(seqChunkMeta, reader);
mergedChunkNum.set(0);
unmergedChunkNum.set(0);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index d0cbbd4..fca1f7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -151,6 +151,16 @@ public class MergeResource {
return ret;
}
+ public IPointReader[] getUnseqReadersV2(List<Path> paths) throws IOException {
+ List<Chunk>[] pathChunks = MergeUtils.collectUnseqChunksV2(paths, unseqFiles, this);
+ IPointReader[] ret = new IPointReader[paths.size()];
+ for (int i = 0; i < paths.size(); i++) {
+ TSDataType dataType = getSchema(paths.get(i).getMeasurement()).getType();
+ ret[i] = new CachedUnseqResourceMergeReader(pathChunks[i], dataType);
+ }
+ return ret;
+ }
+
/**
* Construct the a new or get an existing ChunkWriter of a measurement. Different timeseries of
* the same measurement shares the same instance.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index b874849..d99bc69 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.flush.FlushManager;
-import org.apache.iotdb.db.engine.merge.util.ChunkProviderExecutor;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProviderExecutor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.StatMonitor;
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index ef9a49b..9e01aa6 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -196,6 +196,26 @@ public class MergeUtils {
return ret;
}
+ public static List<Chunk>[] collectUnseqChunksV2(List<Path> paths,
+ List<TsFileResource> unseqResources, MergeResource mergeResource) throws IOException {
+ List<Chunk>[] ret = new List[paths.size()];
+ for (int i = 0; i < paths.size(); i++) {
+ ret[i] = new ArrayList<>();
+ }
+ PriorityQueue<MetaListEntry> chunkMetaHeap = new PriorityQueue<>();
+
+ for (TsFileResource tsFileResource : unseqResources) {
+
+ TsFileSequenceReader tsFileReader = mergeResource.getFileReader(tsFileResource);
+ // prepare metaDataList
+ buildMetaHeap(paths, tsFileReader, mergeResource, tsFileResource, chunkMetaHeap);
+
+ // read chunks order by their position
+ collectUnseqChunks(chunkMetaHeap, tsFileReader, ret);
+ }
+ return ret;
+ }
+
private static void buildMetaHeap(List<Path> paths, TsFileSequenceReader tsFileReader,
MergeResource resource, TsFileResource tsFileResource, PriorityQueue<MetaListEntry> chunkMetaHeap)
throws IOException {
@@ -217,9 +237,9 @@ public class MergeUtils {
}
}
}
-
private static void collectUnseqChunks(PriorityQueue<MetaListEntry> chunkMetaHeap,
TsFileSequenceReader tsFileReader, List<Chunk>[] ret) throws IOException {
+
while (!chunkMetaHeap.isEmpty()) {
MetaListEntry metaListEntry = chunkMetaHeap.poll();
ChunkMetaData currMeta = metaListEntry.current();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergePerfTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergePerfTest.java
index d51a665..32f063d 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergePerfTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/inplace/MergePerfTest.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.merge.MergeTest;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
-import org.apache.iotdb.db.engine.merge.util.ChunkProviderExecutor;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProviderExecutor;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -41,6 +41,7 @@ public class MergePerfTest extends MergeTest {
tempSGDir.mkdirs();
setUp();
System.out.println("Files prepared.");
+ Thread.sleep(3000);
long timeConsumption = System.currentTimeMillis();
MergeResource resource = new MergeResource(seqResources, unseqResources);
@@ -64,7 +65,7 @@ public class MergePerfTest extends MergeTest {
perfTest.seqFileNum = 5;
perfTest.unseqFileNum = 5;
- perfTest.measurementNum = 10000;
+ perfTest.measurementNum = 2000;
perfTest.deviceNum = 1;
perfTest.ptNum = 5000;
perfTest.flushInterval = 1000;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
index 11832b6..79d940d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
@@ -26,12 +26,16 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
*/
public class Chunk {
- private ChunkHeader chunkHeader;
- private ByteBuffer chunkData;
+ ChunkHeader chunkHeader;
+ ByteBuffer chunkData;
/**
* All data with timestamp <= deletedAt are considered deleted.
*/
- private long deletedAt;
+ long deletedAt;
+
+ Chunk() {
+
+ }
public Chunk(ChunkHeader header, ByteBuffer buffer, long deletedAt) {
this.chunkHeader = header;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DiskChunk.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DiskChunk.java
new file mode 100644
index 0000000..4fba0df
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DiskChunk.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.iotdb.tsfile.read.common;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DiskChunk extends Chunk {
+
+ private static final Logger logger = LoggerFactory.getLogger(DiskChunk.class);
+
+ private boolean onDisk = true;
+ private ChunkMetaData metaData;
+ private ChunkProvider provider;
+
+ public DiskChunk(ChunkMetaData metaData,
+ ChunkProvider provider) {
+ this.metaData = metaData;
+ this.provider = provider;
+ }
+
+ private void load() {
+ if (onDisk) {
+ Chunk chunk;
+ try {
+ chunk = provider.require(metaData);
+ this.chunkHeader = chunk.chunkHeader;
+ this.chunkData = chunk.chunkData;
+ onDisk = false;
+ } catch (InterruptedException | IOException e) {
+ logger.error("Cannot load a chunk, metadata: {}", metaData, e);
+ }
+ }
+ }
+
+ @Override
+ public ChunkHeader getHeader() {
+
+ return super.getHeader();
+ }
+
+ @Override
+ public ByteBuffer getData() {
+ return super.getData();
+ }
+
+ @Override
+ public long getDeletedAt() {
+ return super.getDeletedAt();
+ }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/ChunkProvider.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProvider.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/util/ChunkProvider.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProvider.java
index 4f6c7ba..37555ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/ChunkProvider.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProvider.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge.util;
+package org.apache.iotdb.tsfile.read.common.util;
import java.io.IOException;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/ChunkProviderExecutor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProviderExecutor.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/util/ChunkProviderExecutor.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProviderExecutor.java
index b8b9987..295e479 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/ChunkProviderExecutor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProviderExecutor.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge.util;
+package org.apache.iotdb.tsfile.read.common.util;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/DirectChunkProvider.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/DirectChunkProvider.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/util/DirectChunkProvider.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/DirectChunkProvider.java
index eb1c991..36be6ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/DirectChunkProvider.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/DirectChunkProvider.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge.util;
+package org.apache.iotdb.tsfile.read.common.util;
import java.io.IOException;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/SharedMapChunkProvider.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/SharedMapChunkProvider.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/util/SharedMapChunkProvider.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/SharedMapChunkProvider.java
index b21ffd2..dd9f9bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/SharedMapChunkProvider.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/SharedMapChunkProvider.java
@@ -19,13 +19,10 @@
*
*/
-package org.apache.iotdb.db.engine.merge.util;
+package org.apache.iotdb.tsfile.read.common.util;
-import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
-import java.util.Deque;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/SharedQueueChunkProvider.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/SharedQueueChunkProvider.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/util/SharedQueueChunkProvider.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/SharedQueueChunkProvider.java
index 18ac34d..bd9e8e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/util/SharedQueueChunkProvider.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/SharedQueueChunkProvider.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.merge.util;
+package org.apache.iotdb.tsfile.read.common.util;
import java.util.ArrayDeque;
import java.util.ArrayList;