You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/11 07:10:06 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: change
bufferwrite flush to asychronous and add MemTablePool to manage all
memtables
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 4a76448 change bufferwrite flush to asychronous and add MemTablePool to manage all memtables
4a76448 is described below
commit 4a7644852fb183d1ea11b813f95b6cc7aaab3d0b
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 11 15:09:31 2019 +0800
change bufferwrite flush to asychronous and add MemTablePool to manage all memtables
---
.../engine/bufferwrite/BufferWriteProcessor.java | 54 +++++--
.../bufferwrite/RestorableTsFileIOWriter.java | 16 +-
.../db/engine/filenode/FileNodeProcessor.java | 27 ++--
.../iotdb/db/engine/memtable/AbstractMemTable.java | 6 +
.../apache/iotdb/db/engine/memtable/IMemTable.java | 12 ++
.../db/engine/memtable/MemTableFlushCallBack.java | 9 ++
.../db/engine/memtable/MemTableFlushTask.java | 180 +++++++++++----------
.../db/engine/memtable/MemTableFlushUtil.java | 4 +-
.../iotdb/db/engine/memtable/MemTablePool.java | 59 +++++++
.../db/engine/memtable/PrimitiveMemTable.java | 5 +
.../db/query/reader/merge/PriorityMergeReader.java | 3 -
.../apache/iotdb/db/service/CloseMergeService.java | 2 +-
.../bufferwrite/RestorableTsFileIOWriterTest.java | 4 +-
.../apache/iotdb/tsfile/write/TsFileWriter.java | 9 +-
.../write/writer/NativeRestorableIOWriter.java | 2 +-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 65 +++++---
.../iotdb/tsfile/write/TsFileIOWriterTest.java | 3 +-
17 files changed, 299 insertions(+), 161 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index d6b7ccc..b331a57 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -21,11 +21,12 @@ package org.apache.iotdb.db.engine.bufferwrite;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -42,7 +43,7 @@ import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
+import org.apache.iotdb.db.engine.memtable.MemTablePool;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.pool.FlushManager;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -53,13 +54,14 @@ import org.apache.iotdb.db.utils.ImmediateFuture;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,9 +74,14 @@ public class BufferWriteProcessor extends Processor {
private volatile Future<Boolean> closeFuture = new BWCloseFuture(new ImmediateFuture<>(true));
private ReentrantLock flushQueryLock = new ReentrantLock();
private AtomicLong memSize = new AtomicLong();
- private long memThreshold = TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
+ private long memThreshold = TSFileConfig.groupSizeInByte;
private IMemTable workMemTable;
private IMemTable flushMemTable;
+
+ // each flush task has a flushId, IO task should scheduled by this id
+ private long flushId = -1;
+ private List<IMemTable> flushingMemTables = new ArrayList<>();
+
private Action bufferwriteFlushAction;
//private Action bufferwriteCloseAction;
private Consumer<BufferWriteProcessor> bufferwriteCloseConsumer;
@@ -153,7 +160,7 @@ public class BufferWriteProcessor extends Processor {
throw new BufferWriteProcessorException(e);
}
if (workMemTable == null) {
- workMemTable = new PrimitiveMemTable();
+ workMemTable = MemTablePool.getInstance().getEmptyMemTable();
} else {
workMemTable.clear();
}
@@ -290,7 +297,7 @@ public class BufferWriteProcessor extends Processor {
flushQueryLock.lock();
LOGGER.info("BufferWrite Processor {} get flushQueryLock for switchWorkToFlush successfully", getProcessorName());
try {
- IMemTable temp = flushMemTable == null ? new PrimitiveMemTable() : flushMemTable;
+ IMemTable temp = flushMemTable == null ? MemTablePool.getInstance().getEmptyMemTable() : flushMemTable;
flushMemTable = workMemTable;
workMemTable = temp;
isFlush = true;
@@ -314,26 +321,40 @@ public class BufferWriteProcessor extends Processor {
}
}
+ private void removeFlushedMemTable(IMemTable memTable, TsFileIOWriter tsFileIOWriter) {
+ this.writeLock();
+ tsFileIOWriter.mergeChunkGroupMetaData();
+ try {
+ flushingMemTables.remove(memTable);
+ } finally {
+ this.writeUnlock();
+ }
+ }
+
/**
* the caller mast guarantee no other concurrent caller entering this function.
*
* @param displayMessage message that will appear in system log.
+ * @param tmpMemTableToFlush
* @param version the operation version that will tagged on the to be flushed memtable
* (i.e., ChunkGroup)
* @param walTaskId used for declaring what the wal file name suffix is.
* @return true if successfully.
*/
- private boolean flushTask(String displayMessage, long version, long walTaskId) {
+ private boolean flushTask(String displayMessage,
+ IMemTable tmpMemTableToFlush, long version,
+ long walTaskId) {
boolean result;
long flushStartTime = System.currentTimeMillis();
LOGGER.info("The bufferwrite processor {} starts flushing {}.", getProcessorName(),
displayMessage);
try {
- if (flushMemTable != null && !flushMemTable.isEmpty()) {
+ if (tmpMemTableToFlush != null && !tmpMemTableToFlush.isEmpty()) {
// flush data
- MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, getProcessorName());
- tableFlushTask.flushMemTable(fileSchema, flushMemTable, version);
+ MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, getProcessorName(), flushId,
+ this::removeFlushedMemTable);
+ tableFlushTask.flushMemTable(fileSchema, tmpMemTableToFlush, version);
// write restore information
writer.flush();
}
@@ -419,7 +440,14 @@ public class BufferWriteProcessor extends Processor {
walTaskId = 0;
}
valueCount = 0;
- switchWorkToFlush();
+
+ synchronized (flushingMemTables) {
+ flushingMemTables.add(workMemTable);
+ }
+ IMemTable tmpMemTableToFlush = workMemTable;
+ workMemTable = MemTablePool.getInstance().getEmptyMemTable();
+
+ flushId++;
long version = versionController.nextVersion();
BasicMemController.getInstance().releaseUsage(this, memSize.get());
memSize.set(0);
@@ -429,7 +457,7 @@ public class BufferWriteProcessor extends Processor {
"flush memtable for bufferwrite processor {} synchronously for close task.",
getProcessorName(), FlushManager.getInstance().getWaitingTasksNumber(),
FlushManager.getInstance().getCorePoolSize());
- flushTask("synchronously", version, walTaskId);
+ flushTask("synchronously", tmpMemTableToFlush, version, walTaskId);
flushFuture = new ImmediateFuture<>(true);
} else {
if (LOGGER.isInfoEnabled()) {
@@ -439,7 +467,7 @@ public class BufferWriteProcessor extends Processor {
FlushManager.getInstance().getCorePoolSize());
}
flushFuture = FlushManager.getInstance().submit(() -> flushTask("asynchronously",
- version, walTaskId));
+ tmpMemTableToFlush, version, walTaskId));
}
} else {
flushFuture = new ImmediateFuture<>(true);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
index a7542b5..5362636 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
@@ -100,8 +100,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
// cut off tsfile
this.out = new DefaultTsFileOutput(new FileOutputStream(insertFile, true));
out.truncate(position);
- this.chunkGroupMetaDataList = existedMetadatas;
- lastFlushedChunkGroupIndex = chunkGroupMetaDataList.size();
+ this.flushedChunkGroupMetaDataList = existedMetadatas;
+ lastFlushedChunkGroupIndex = flushedChunkGroupMetaDataList.size();
append = new ArrayList<>();
// recovery the metadata
recoverMetadata(existedMetadatas);
@@ -122,8 +122,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
LOGGER.info("remove unsealed tsfile restore file failed: ", e);
}
this.out = new DefaultTsFileOutput(new FileOutputStream(insertFile));
- this.chunkGroupMetaDataList = new ArrayList<>();
- lastFlushedChunkGroupIndex = chunkGroupMetaDataList.size();
+ this.flushedChunkGroupMetaDataList = new ArrayList<>();
+ lastFlushedChunkGroupIndex = flushedChunkGroupMetaDataList.size();
append = new ArrayList<>();
startFile();
isNewResource = true;
@@ -298,11 +298,11 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
* @return a list of chunkgroup metadata
*/
private List<ChunkGroupMetaData> getAppendedRowGroupMetadata() {
- if (lastFlushedChunkGroupIndex < chunkGroupMetaDataList.size()) {
+ if (lastFlushedChunkGroupIndex < flushedChunkGroupMetaDataList.size()) {
append.clear();
- append.addAll(chunkGroupMetaDataList
- .subList(lastFlushedChunkGroupIndex, chunkGroupMetaDataList.size()));
- lastFlushedChunkGroupIndex = chunkGroupMetaDataList.size();
+ append.addAll(flushedChunkGroupMetaDataList
+ .subList(lastFlushedChunkGroupIndex, flushedChunkGroupMetaDataList.size()));
+ lastFlushedChunkGroupIndex = flushedChunkGroupMetaDataList.size();
}
return append;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index ded5594..78b41ae 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -33,17 +33,13 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -51,7 +47,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -181,7 +176,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
*/
private Map<String, Action> parameters;
private FileSchema fileSchema;
- private Action flushFileNodeProcessorAction = () -> {
+ private Action fileNodeFlushAction = () -> {
synchronized (fileNodeProcessorStore) {
try {
writeStoreToDisk(fileNodeProcessorStore);
@@ -403,7 +398,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
void addIntervalFileNode(TsFileResource tsFileResource) throws ActionException {
newFileNodes.add(tsFileResource);
fileNodeProcessorStore.setNewFileNodes(newFileNodes);
- flushFileNodeProcessorAction.act();
+ fileNodeFlushAction.act();
}
/**
@@ -496,7 +491,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bufferwriteFlushAction);
//parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bufferwriteCloseAction);
parameters
- .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
+ .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fileNodeFlushAction);
String baseDir = directories
.getTsFileFolder(newFileNodes.get(newFileNodes.size() - 1).getBaseDirIndex());
if (LOGGER.isInfoEnabled()) {
@@ -524,7 +519,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
LOGGER.info("The filenode processor {} will recovery the overflow processor.",
getProcessorName());
parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
- parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
+ parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fileNodeFlushAction);
try {
overflowProcessor = new OverflowProcessor(getProcessorName(), parameters, fileSchema,
versionController);
@@ -571,7 +566,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
params.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bufferwriteFlushAction);
//params.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bufferwriteCloseAction);
params
- .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
+ .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fileNodeFlushAction);
String baseDir = directories.getNextFolderForTsfile();
LOGGER.info("Allocate folder {} for the new bufferwrite processor.", baseDir);
// construct processor or restore
@@ -605,7 +600,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
// construct processor or restore
params.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
params
- .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
+ .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fileNodeFlushAction);
overflowProcessor = new OverflowProcessor(processorName, params, fileSchema,
versionController);
} else if (overflowProcessor.isClosed()) {
@@ -922,7 +917,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
bufferwriteFlushAction.act();
fileNodeProcessorStore.setNewFileNodes(newFileNodes);
// reconstruct the inverted index of the newFileNodes
- flushFileNodeProcessorAction.act();
+ fileNodeFlushAction.act();
addAllFileIntoIndex(newFileNodes);
} catch (Exception e) {
LOGGER.error("Failed to append the tsfile {} to filenode processor {}.", appendFile,
@@ -1575,9 +1570,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
if (mergeIsChunkGroupHasData) {
// end the new rowGroupMetadata
- long size = mergeFileWriter.getPos() - mergeStartPos;
- footer = new ChunkGroupFooter(deviceId, size, numOfChunk);
- mergeFileWriter.endChunkGroup(footer, 0);
+ mergeFileWriter.endChunkGroup(0);
}
}
} finally {
@@ -2089,7 +2082,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
Objects.equals(newMultiPassCount, that.newMultiPassCount) &&
Objects.equals(parameters, that.parameters) &&
Objects.equals(fileSchema, that.fileSchema) &&
- Objects.equals(flushFileNodeProcessorAction, that.flushFileNodeProcessorAction) &&
+ Objects.equals(fileNodeFlushAction, that.fileNodeFlushAction) &&
Objects.equals(bufferwriteFlushAction, that.bufferwriteFlushAction) &&
Objects.equals(overflowFlushAction, that.overflowFlushAction);
}
@@ -2102,7 +2095,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath,
lastMergeTime, bufferWriteProcessor, overflowProcessor, oldMultiPassTokenSet,
newMultiPassTokenSet, oldMultiPassCount, newMultiPassCount, shouldRecovery, parameters,
- fileSchema, flushFileNodeProcessorAction, bufferwriteFlushAction,
+ fileSchema, fileNodeFlushAction, bufferwriteFlushAction,
overflowFlushAction, multiPassLockToken);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 097f7f7..7e57a6b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -174,4 +174,10 @@ public abstract class AbstractMemTable implements IMemTable {
}
return null;
}
+
+ @Override
+ public boolean containSeries(String deviceId, String measurementId) {
+ Map<String, IWritableMemChunk> deviceMap = memTableMap.get(deviceId);
+ return deviceMap != null && deviceMap.containsKey(measurementId);
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 53ea2bc..bca8d05 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -22,6 +22,8 @@ import java.util.Map;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
/**
* IMemTable is designed to store data points which are not flushed into TsFile yet. An instance of
@@ -42,6 +44,14 @@ public interface IMemTable {
int size();
+ default void insert(TSRecord tsRecord) {
+ for (DataPoint dataPoint : tsRecord.dataPointList) {
+ write(tsRecord.deviceId, dataPoint.getMeasurementId(), dataPoint.getType(),
+ tsRecord.time,
+ dataPoint.getValue().toString());
+ }
+ }
+
ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
Map<String, String> props);
@@ -69,4 +79,6 @@ public interface IMemTable {
* @return a MemTable with the same data as this one.
*/
IMemTable copy();
+
+ boolean containSeries(String deviceId, String measurementId);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushCallBack.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushCallBack.java
new file mode 100644
index 0000000..d5e2ae5
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushCallBack.java
@@ -0,0 +1,9 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+@FunctionalInterface
+public interface MemTableFlushCallBack {
+
+ void afterFlush(IMemTable memTable, TsFileIOWriter tsFileIOWriter);
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
index 7b28992..27926c0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
@@ -1,19 +1,15 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.memtable;
@@ -49,63 +45,38 @@ public class MemTableFlushTask {
private ConcurrentLinkedQueue memoryTaskQueue = new ConcurrentLinkedQueue();
private boolean stop = false;
private String processName;
+ private long flushId;
- //the position of the tsfile when a flush task begins.
- long startPos;
+ private MemTableFlushCallBack flushCallBack;
+ private IMemTable memTable;
+
+ public MemTableFlushTask(TsFileIOWriter writer, String processName, long flushId,
+ MemTableFlushCallBack callBack) {
+ this.tsFileIoWriter = writer;
+ this.processName = processName;
+ this.flushId = flushId;
+ this.flushCallBack = callBack;
+ ioFlushThread.start();
+ memoryFlushThread.start();
+ }
- //TODO more better way is: for each TsFile, assign it a Executors.singleThreadPool,
- // rather than per each memtable.
- private Thread ioFlushThread = new Thread(() -> {
- long ioTime = 0;
- while (!stop) {
- Object seriesWriterOrEndChunkGroupTask = ioTaskQueue.poll();
- if (seriesWriterOrEndChunkGroupTask == null ) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOGGER.error("BufferWrite Processor {}, io flush task is interrupted.", processName, e);
- }
- } else {
- long starTime = System.currentTimeMillis();
- try {
- if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
- ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
- } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
- tsFileIoWriter.startFlushChunkGroup((String)seriesWriterOrEndChunkGroupTask);
- } else {
- ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
- long memSize = tsFileIoWriter.getPos() - startPos;
- ChunkGroupFooter footer = new ChunkGroupFooter(task.deviceId, memSize, task.seriesNumber);
- tsFileIoWriter.endChunkGroup(footer, task.version);
- startPos = tsFileIoWriter.getPos();
- task.finished = true;
- }
- } catch (IOException e) {
- LOGGER.error("BufferWrite Processor {}, io error.", processName, e);
- throw new RuntimeException(e);
- }
- ioTime += System.currentTimeMillis() - starTime;
- }
- }
- LOGGER.info("BufferWrite Processor {}, flushing a memtable into disk: io cost {} ms.", processName, ioTime );
- });
private Thread memoryFlushThread = new Thread(() -> {
long memSerializeTime = 0;
while (!stop) {
Object task = memoryTaskQueue.poll();
- if (task == null ) {
+ if (task == null) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
LOGGER.error("BufferWrite Processor {}, io flush task is interrupted.", processName, e);
}
} else {
- if (task instanceof String) {
+ if (task instanceof String) {
ioTaskQueue.add(task);
} else if (task instanceof ChunkGroupIoTask) {
ioTaskQueue.add(task);
- }else {
+ } else {
long starTime = System.currentTimeMillis();
Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
@@ -123,48 +94,91 @@ public class MemTableFlushTask {
}
}
}
- LOGGER.info("BufferWrite Processor {},flushing a memtable into disk: serialize data into mem cost {} ms.", processName, memSerializeTime );
+ LOGGER.info(
+ "BufferWrite Processor {},flushing a memtable into disk: serialize data into mem cost {} ms.",
+ processName, memSerializeTime);
+ });
+
+
+ //TODO more better way is: for each TsFile, assign it a Executors.singleThreadPool,
+ // rather than per each memtable.
+ private Thread ioFlushThread = new Thread(() -> {
+ long ioTime = 0;
+ while (tsFileIoWriter.getFlushID().get() != flushId) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOGGER.error("Processor {}, last flush io task is not finished.", processName, e);
+ }
+ }
+ while (!stop) {
+ Object seriesWriterOrEndChunkGroupTask = ioTaskQueue.poll();
+ if (seriesWriterOrEndChunkGroupTask == null) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOGGER.error("BufferWrite Processor {}, io flush task is interrupted.", processName, e);
+ }
+ } else {
+ long starTime = System.currentTimeMillis();
+ try {
+ if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
+ ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
+ } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
+ tsFileIoWriter.startFlushChunkGroup((String) seriesWriterOrEndChunkGroupTask);
+ } else {
+ ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
+ tsFileIoWriter.endChunkGroup(task.version);
+ task.finished = true;
+ }
+ } catch (IOException e) {
+ LOGGER.error("BufferWrite Processor {}, io error.", processName, e);
+ throw new RuntimeException(e);
+ }
+ ioTime += System.currentTimeMillis() - starTime;
+ }
+ }
+ flushCallBack.afterFlush(memTable, tsFileIoWriter);
+ MemTablePool.getInstance().release(memTable);
+ tsFileIoWriter.getFlushID().getAndIncrement();
+ LOGGER.info("BufferWrite Processor {}, flushing a memtable into disk: io cost {} ms.",
+ processName, ioTime);
});
- public MemTableFlushTask(TsFileIOWriter writer, String processName) {
- this.tsFileIoWriter = writer;
- this.processName = processName;
- ioFlushThread.start();
- memoryFlushThread.start();
- }
- private void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
+ private void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
TSDataType dataType)
throws IOException {
for (TimeValuePair timeValuePair : tvPairs) {
switch (dataType) {
case BOOLEAN:
- seriesWriterImpl
- .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
+ seriesWriterImpl
+ .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
break;
case INT32:
- seriesWriterImpl.write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getInt());
+ seriesWriterImpl.write(timeValuePair.getTimestamp(),
+ timeValuePair.getValue().getInt());
break;
case INT64:
- seriesWriterImpl.write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getLong());
+ seriesWriterImpl.write(timeValuePair.getTimestamp(),
+ timeValuePair.getValue().getLong());
break;
case FLOAT:
- seriesWriterImpl.write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getFloat());
+ seriesWriterImpl.write(timeValuePair.getTimestamp(),
+ timeValuePair.getValue().getFloat());
break;
case DOUBLE:
- seriesWriterImpl
- .write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getDouble());
+ seriesWriterImpl
+ .write(timeValuePair.getTimestamp(),
+ timeValuePair.getValue().getDouble());
break;
case TEXT:
- seriesWriterImpl
- .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
+ seriesWriterImpl
+ .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
break;
default:
- LOGGER.error("BufferWrite Processor {}, don't support data type: {}", processName, dataType);
+ LOGGER.error("BufferWrite Processor {}, don't support data type: {}", processName,
+ dataType);
break;
}
}
@@ -173,11 +187,10 @@ public class MemTableFlushTask {
/**
* the function for flushing memtable.
*/
- public void flushMemTable(FileSchema fileSchema,
- IMemTable imemTable, long version) throws IOException {
+ public void flushMemTable(FileSchema fileSchema, IMemTable imemTable, long version) {
long sortTime = 0;
- startPos = tsFileIoWriter.getPos();
ChunkGroupIoTask theLastTask = EMPTY_TASK;
+ this.memTable = imemTable;
for (String deviceId : imemTable.getMemTableMap().keySet()) {
memoryTaskQueue.add(deviceId);
int seriesNumber = imemTable.getMemTableMap().get(deviceId).size();
@@ -194,12 +207,14 @@ public class MemTableFlushTask {
memoryTaskQueue.add(theLastTask);
}
LOGGER.info(
- "BufferWrite Processor {}, flushing a memtable into disk: data sort time cost {} ms.", processName, sortTime );
+ "BufferWrite Processor {}, flushing a memtable into disk: data sort time cost {} ms.",
+ processName, sortTime);
while (!theLastTask.finished) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
- LOGGER.error("BufferWrite Processor {}, flush memtable table thread is interrupted.", processName, e);
+ LOGGER.error("BufferWrite Processor {}, flush memtable table thread is interrupted.",
+ processName, e);
throw new RuntimeException(e);
}
}
@@ -208,6 +223,7 @@ public class MemTableFlushTask {
static class ChunkGroupIoTask {
+
int seriesNumber;
String deviceId;
long version;
@@ -216,6 +232,7 @@ public class MemTableFlushTask {
public ChunkGroupIoTask(int seriesNumber, String deviceId, long version) {
this(seriesNumber, deviceId, version, false);
}
+
public ChunkGroupIoTask(int seriesNumber, String deviceId, long version, boolean finished) {
this.seriesNumber = seriesNumber;
this.deviceId = deviceId;
@@ -223,6 +240,7 @@ public class MemTableFlushTask {
this.finished = finished;
}
}
+
private static ChunkGroupIoTask EMPTY_TASK = new ChunkGroupIoTask(0, "", 0, true);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
index 4aa12e9..e9a24c7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
@@ -110,9 +110,7 @@ public class MemTableFlushUtil {
ioTime += System.currentTimeMillis() - startTime;
}
tmpTime = System.currentTimeMillis();
- long memSize = tsFileIoWriter.getPos() - startPos;
- ChunkGroupFooter footer = new ChunkGroupFooter(deviceId, memSize, seriesNumber);
- tsFileIoWriter.endChunkGroup(footer, version);
+ tsFileIoWriter.endChunkGroup(version);
ioTime += System.currentTimeMillis() - tmpTime;
}
LOGGER.info(
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
new file mode 100644
index 0000000..1fb4a64
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -0,0 +1,59 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import java.util.Stack;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemTablePool {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MemTablePool.class);
+
+ private Stack<IMemTable> emptyMemTables;
+ private int capacity = 10;
+ private int size = 0;
+
+ private static final MemTablePool INSTANCE = new MemTablePool();
+
+ public MemTablePool() {
+ emptyMemTables = new Stack<>();
+ }
+
+ public IMemTable getEmptyMemTable() {
+ synchronized (emptyMemTables) {
+ if (emptyMemTables.isEmpty() && size < capacity) {
+ size++;
+ return new PrimitiveMemTable();
+ } else if (!emptyMemTables.isEmpty()){
+ return emptyMemTables.pop();
+ }
+ }
+ // wait until some one has released a memtable
+ while (true) {
+ if(!emptyMemTables.isEmpty()) {
+ synchronized (emptyMemTables) {
+ if (!emptyMemTables.isEmpty()){
+ return emptyMemTables.pop();
+ }
+ }
+ }
+ try {
+ Thread.sleep(20);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("Unexpected interruption", e);
+ }
+ }
+ }
+
+
+ public void release(IMemTable memTable) {
+ synchronized (emptyMemTables) {
+ memTable.clear();
+ emptyMemTables.push(memTable);
+ }
+ }
+
+ public static MemTablePool getInstance() {
+ return INSTANCE;
+ }
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index 455196a..eac3323 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -44,4 +44,9 @@ public class PrimitiveMemTable extends AbstractMemTable {
return new PrimitiveMemTable(newMap);
}
+
+ @Override
+ public boolean equals(Object obj) {
+ return this == obj;
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReader.java
index e76d142..251bd9d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReader.java
@@ -33,9 +33,6 @@ import org.apache.iotdb.db.utils.TimeValuePair;
*/
public class PriorityMergeReader implements IPointReader {
- public static final int LOW_PRIORITY = 1;
- public static final int HIGH_PRIORITY = 2;
-
private List<IPointReader> readerList = new ArrayList<>();
private List<Integer> priorityList = new ArrayList<>();
private PriorityQueue<Element> heap = new PriorityQueue<>();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java b/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java
index 4131396..0e7cb8a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java
@@ -143,7 +143,7 @@ public class CloseMergeService implements IService {
@Override
public void run() {
service.scheduleWithFixedDelay(mergeService, MERGE_DELAY, MERGE_PERIOD, TimeUnit.SECONDS);
- service.scheduleWithFixedDelay(closeService, CLOSE_DELAY, CLOSE_PERIOD, TimeUnit.SECONDS);
+// service.scheduleWithFixedDelay(closeService, CLOSE_DELAY, CLOSE_PERIOD, TimeUnit.SECONDS);
while (!service.isShutdown()) {
synchronized (service) {
try {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
index 1f8e3e2..3c239c8 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
@@ -167,7 +167,7 @@ public class RestorableTsFileIOWriterTest {
memTable.write("d2", "s2", TSDataType.INT32, 2, "1");
memTable.write("d2", "s2", TSDataType.INT32, 4, "1");
//MemTableFlushUtil.flushMemTable(schema, writer, memTable, 0);
- MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, "test");
+ MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, "test", 0L, (a,b) -> {});
tableFlushTask.flushMemTable(schema, memTable, 0);
writer.flush();
writer.appendMetadata();
@@ -222,7 +222,7 @@ public class RestorableTsFileIOWriterTest {
MemTableTestUtils.dataType0);
//MemTableFlushUtil.flushMemTable(MemTableTestUtils.getFileSchema(), writer, memTable, 0);
- MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, "test");
+ MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, "test", 0L, (a,b) -> {});
tableFlushTask.flushMemTable(MemTableTestUtils.getFileSchema(), memTable, 0);
writer.flush();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index b201324..52cfdf9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -74,11 +74,6 @@ public class TsFileWriter implements AutoCloseable{
**/
private long recordCountForNextMemCheck = 100;
private long chunkGroupSizeThreshold;
- /**
- * In an individual TsFile, version number is not meaningful, added
- * only for tests.
- */
- private long version = 0;
/**
* init this TsFileWriter.
@@ -295,8 +290,7 @@ public class TsFileWriter implements AutoCloseable{
"Flushed data size is inconsistent with computation! Estimated: %d, Actuall: %d",
chunkGroupFooter.getDataSize(), fileWriter.getPos() - pos));
}
-
- fileWriter.endChunkGroup(chunkGroupFooter, version++);
+ fileWriter.endChunkGroup(0);
}
long actualTotalChunkGroupSize = fileWriter.getPos() - totalMemStart;
LOG.info("total chunk group size:{}", actualTotalChunkGroupSize);
@@ -317,6 +311,7 @@ public class TsFileWriter implements AutoCloseable{
*
* @throws IOException exception in IO
*/
+ @Override
public void close() throws IOException {
LOG.info("start close file");
flushAllChunkGroups();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
index 2496d42..6fb4ee5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
@@ -68,7 +68,7 @@ public class NativeRestorableIOWriter extends TsFileIOWriter {
out.close();
return;
}
- truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetaDataList, !append);
+ truncatedPosition = reader.selfCheck(knownSchemas, flushedChunkGroupMetaDataList, !append);
if (truncatedPosition == TsFileCheckStatus.COMPLETE_FILE && !append) {
this.canWrite = false;
out.close();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index c83c8e8..75301e1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
import org.apache.iotdb.tsfile.file.MetaMarker;
@@ -65,7 +66,9 @@ public class TsFileIOWriter {
}
protected TsFileOutput out;
- protected List<ChunkGroupMetaData> chunkGroupMetaDataList = new ArrayList<>();
+ private AtomicLong flushID = new AtomicLong(0);
+ protected List<ChunkGroupMetaData> flushedChunkGroupMetaDataList = new ArrayList<>();
+ protected List<ChunkGroupMetaData> flushingChunkGroupMetaDataList = new ArrayList<>();
private ChunkGroupMetaData currentChunkGroupMetaData;
private ChunkMetaData currentChunkMetaData;
protected boolean canWrite = true;
@@ -104,14 +107,14 @@ public class TsFileIOWriter {
* data in the TsFileOutput matches the given metadata list
*
* @param out the target output
- * @param chunkGroupMetaDataList existing chunkgroups' metadata
+ * @param flushedChunkGroupMetaDataList existing chunkgroups' metadata
* @throws IOException if I/O error occurs
*/
- public TsFileIOWriter(TsFileOutput out, List<ChunkGroupMetaData> chunkGroupMetaDataList)
+ public TsFileIOWriter(TsFileOutput out, List<ChunkGroupMetaData> flushedChunkGroupMetaDataList)
throws IOException {
this.out = out;
- this.chunkGroupMetaDataList = chunkGroupMetaDataList;
- if (chunkGroupMetaDataList.isEmpty()) {
+ this.flushedChunkGroupMetaDataList = flushedChunkGroupMetaDataList;
+ if (flushedChunkGroupMetaDataList.isEmpty()) {
startFile();
}
}
@@ -131,6 +134,19 @@ public class TsFileIOWriter {
out.write(magicStringBytes);
}
+ public AtomicLong getFlushID() {
+ return flushID;
+ }
+
+ /**
+ * move ChunkGroupMetadata from flushingChunkGroupMetaDataList to flushedChunkGroupMetaDataList
+ * only flushedChunkGroupMetaDataList is visible for query
+ */
+ public void mergeChunkGroupMetaData() {
+ flushedChunkGroupMetaDataList.addAll(flushingChunkGroupMetaDataList);
+ flushingChunkGroupMetaDataList.clear();
+ }
+
/**
* start a {@linkplain ChunkGroupMetaData ChunkGroupMetaData}.
*
@@ -142,6 +158,21 @@ public class TsFileIOWriter {
}
/**
+ * end chunk and write some log.
+ */
+ public void endChunkGroup(long version) throws IOException {
+ long dataSize = out.getPosition() - currentChunkGroupMetaData.getStartOffsetOfChunkGroup();
+ ChunkGroupFooter chunkGroupFooter = new ChunkGroupFooter(currentChunkGroupMetaData.getDeviceID(),
+ dataSize, currentChunkGroupMetaData.getChunkMetaDataList().size());
+ chunkGroupFooter.serializeTo(out.wrapAsStream());
+ currentChunkGroupMetaData.setEndOffsetOfChunkGroup(out.getPosition());
+ currentChunkGroupMetaData.setVersion(version);
+ flushingChunkGroupMetaDataList.add(currentChunkGroupMetaData);
+ LOG.debug("end chunk group:{}", currentChunkGroupMetaData);
+ currentChunkGroupMetaData = null;
+ }
+
+ /**
* start a {@linkplain ChunkMetaData ChunkMetaData}.
*
* @param descriptor - measurement of this time series
@@ -200,20 +231,6 @@ public class TsFileIOWriter {
}
/**
- * end chunk and write some log.
- *
- * @param chunkGroupFooter -use to serialize
- */
- public void endChunkGroup(ChunkGroupFooter chunkGroupFooter, long version) throws IOException {
- chunkGroupFooter.serializeTo(out.wrapAsStream());
- currentChunkGroupMetaData.setEndOffsetOfChunkGroup(out.getPosition());
- currentChunkGroupMetaData.setVersion(version);
- chunkGroupMetaDataList.add(currentChunkGroupMetaData);
- LOG.debug("end chunk group:{}", currentChunkGroupMetaData);
- currentChunkGroupMetaData = null;
- }
-
- /**
* write {@linkplain TsFileMetaData TSFileMetaData} to output stream and close it.
*
* @param schema FileSchema
@@ -221,6 +238,8 @@ public class TsFileIOWriter {
*/
public void endFile(FileSchema schema) throws IOException {
+ mergeChunkGroupMetaData();
+
// serialize the SEPARATOR of MetaData and ChunkGroups
ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
@@ -229,7 +248,7 @@ public class TsFileIOWriter {
LOG.debug("get time series list:{}", schemaDescriptors);
Map<String, TsDeviceMetadataIndex> tsDeviceMetadataIndexMap = flushTsDeviceMetaDataAndGetIndex(
- this.chunkGroupMetaDataList);
+ this.flushedChunkGroupMetaDataList);
TsFileMetaData tsFileMetaData = new TsFileMetaData(tsDeviceMetadataIndexMap, schemaDescriptors,
TSFileConfig.CURRENT_VERSION);
@@ -254,7 +273,7 @@ public class TsFileIOWriter {
}
/**
- * 1. group chunkGroupMetaDataList to TsDeviceMetadata 2. flush TsDeviceMetadata 3. get
+ * 1. group flushedChunkGroupMetaDataList to TsDeviceMetadata 2. flush TsDeviceMetadata 3. get
* TsDeviceMetadataIndex
*
* @param chunkGroupMetaDataList all chunk group metadata in memory
@@ -321,12 +340,12 @@ public class TsFileIOWriter {
}
/**
- * get chunkGroupMetaDataList.
+ * get flushedChunkGroupMetaDataList.
*
* @return - List of chunkGroupMetaData
*/
public List<ChunkGroupMetaData> getChunkGroupMetaDatas() {
- return chunkGroupMetaDataList;
+ return flushedChunkGroupMetaDataList;
}
public boolean canWrite() {
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index 8c6cf00..a00e39a 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -60,8 +60,7 @@ public class TsFileIOWriterTest {
writer.startFlushChunk(measurementSchema, measurementSchema.getCompressor(),
measurementSchema.getType(), measurementSchema.getEncodingType(), statistics, 0, 0, 0, 0);
writer.endChunk(0);
- ChunkGroupFooter footer = new ChunkGroupFooter(deviceId, 0, 1);
- writer.endChunkGroup(footer, 0);
+ writer.endChunkGroup(0);
// end file
writer.endFile(fileSchema);