You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/01/12 07:51:46 UTC
[iotdb] 01/01: finish
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch DirectByteBuffer
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 16e3dc365107b2eebeb9644eadafc5cb56517269
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jan 12 15:50:48 2021 +0800
finish
---
.../db/engine/storagegroup/StorageGroupInfo.java | 51 +++++++++++--
.../engine/storagegroup/StorageGroupProcessor.java | 87 +++++++++++++++++++++-
.../db/engine/storagegroup/TsFileProcessor.java | 12 +--
.../java/org/apache/iotdb/db/utils/MmapUtil.java | 71 ++++++++++++++++++
.../writelog/manager/MultiFileLogNodeManager.java | 36 +++++----
.../db/writelog/manager/WriteLogNodeManager.java | 7 +-
.../db/writelog/node/ExclusiveWriteLogNode.java | 24 ++++--
.../iotdb/db/writelog/node/WriteLogNode.java | 3 +-
.../iotdb/db/writelog/recover/LogReplayer.java | 12 ++-
.../writelog/recover/TsFileRecoverPerformer.java | 16 ++--
.../iotdb/db/writelog/IoTDBLogFileSizeTest.java | 25 ++++++-
.../apache/iotdb/db/writelog/PerformanceTest.java | 50 ++++++++++---
.../iotdb/db/writelog/WriteLogNodeManagerTest.java | 63 +++++++++++++---
.../apache/iotdb/db/writelog/WriteLogNodeTest.java | 78 +++++++++++++++----
.../iotdb/db/writelog/recover/LogReplayerTest.java | 24 +++++-
.../recover/RecoverResourceFromReaderTest.java | 54 +++++++++++---
.../db/writelog/recover/SeqTsFileRecoverTest.java | 51 +++++++++++--
.../writelog/recover/UnseqTsFileRecoverTest.java | 31 +++++++-
18 files changed, 579 insertions(+), 116 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index a31d41a..d0620a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -18,12 +18,18 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.utils.TestOnly;
/**
* The storageGroupInfo records the total memory cost of the Storage Group.
@@ -33,15 +39,15 @@ public class StorageGroupInfo {
private StorageGroupProcessor storageGroupProcessor;
/**
- * The total Storage group memory cost,
- * including unsealed TsFileResource, ChunkMetadata, WAL, primitive arrays and TEXT values
+ * The total Storage group memory cost, including unsealed TsFileResource, ChunkMetadata, WAL,
+ * primitive arrays and TEXT values
*/
private AtomicLong memoryCost;
/**
* The threshold of reporting it's size to SystemInfo
*/
- private long storageGroupSizeReportThreshold =
+ private long storageGroupSizeReportThreshold =
IoTDBDescriptor.getInstance().getConfig().getStorageGroupSizeReportThreshold();
private AtomicLong lastReportedSize = new AtomicLong();
@@ -94,13 +100,46 @@ public class StorageGroupInfo {
}
/**
- * When a TsFileProcessor is closing, remove it from reportedTsps, and report to systemInfo
- * to update SG cost.
- *
+ * When a TsFileProcessor is closing, remove it from reportedTsps, and report to systemInfo to
+ * update SG cost.
+ *
* @param tsFileProcessor
*/
public void closeTsFileProcessorAndReportToSystem(TsFileProcessor tsFileProcessor) {
reportedTsps.remove(tsFileProcessor);
SystemInfo.getInstance().resetStorageGroupStatus(this, true);
}
+
+ public Supplier<ByteBuffer[]> getWalSupplier() {
+ if (storageGroupProcessor != null) {
+ return storageGroupProcessor::getWalDirectByteBuffer;
+ } else { // only happens in test
+ return this::walSupplier;
+ }
+ }
+
+ @TestOnly
+ private ByteBuffer[] walSupplier() {
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return buffers;
+ }
+
+ public Consumer<ByteBuffer[]> getWalConsumer() {
+ if (storageGroupProcessor != null) {
+ return storageGroupProcessor::releaseWalBuffer;
+ } else { // only happens in test
+ return this::walConsumer;
+ }
+ }
+
+ @TestOnly
+ private void walConsumer(ByteBuffer[] buffers) {
+ for (ByteBuffer byteBuffer : buffers) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index d32386f..9e2479b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -25,11 +25,14 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -38,7 +41,10 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
@@ -84,6 +90,7 @@ import org.apache.iotdb.db.query.control.QueryFileManager;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
+import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
import org.apache.iotdb.rpc.RpcUtils;
@@ -261,6 +268,79 @@ public class StorageGroupProcessor {
private List<CloseFileListener> customCloseFileListeners = Collections.emptyList();
private List<FlushListener> customFlushListeners = Collections.emptyList();
+ private static final int WAL_BUFFER_SIZE =
+ IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2;
+
+ private static final int MAX_WAL_BYTEBUFFER_NUM =
+ IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition() * 4;
+
+ private static final long DEFAULT_POOL_TRIM_INTERVAL_MILLIS = 10_000;
+
+ private final Deque<ByteBuffer> walByteBufferPool = new LinkedList<>();
+
+ private int currentWalPoolSize = 0;
+
+
+ /**
+ * get the direct byte buffer from pool, each fetch contains two ByteBuffer
+ */
+ public ByteBuffer[] getWalDirectByteBuffer() {
+ ByteBuffer[] res = new ByteBuffer[2];
+ synchronized (walByteBufferPool) {
+ while (walByteBufferPool.isEmpty() && currentWalPoolSize + 2 > MAX_WAL_BYTEBUFFER_NUM) {
+ try {
+ walByteBufferPool.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger
+ .error("getDirectByteBuffer occurs error while waiting for DirectByteBuffer"
+ + "group {}", storageGroupName, e);
+ }
+ }
+ // If the queue is not empty, it must have at least two.
+ if (!walByteBufferPool.isEmpty()) {
+ res[0] = walByteBufferPool.pollFirst();
+ res[1] = walByteBufferPool.pollFirst();
+ } else {
+ // if the queue is empty and current size is less than MAX_BYTEBUFFER_NUM
+ // we can construct another two more new byte buffer
+ currentWalPoolSize += 2;
+ res[0] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE);
+ res[1] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE);
+ }
+ }
+ return res;
+ }
+
+ /**
+ * put the byteBuffer back to pool
+ */
+ public void releaseWalBuffer(ByteBuffer[] byteBuffers) {
+ for (ByteBuffer byteBuffer : byteBuffers) {
+ byteBuffer.clear();
+ }
+ synchronized (walByteBufferPool) {
+ walByteBufferPool.addLast(byteBuffers[0]);
+ walByteBufferPool.addLast(byteBuffers[1]);
+ walByteBufferPool.notifyAll();
+ }
+ }
+
+ /**
+ * trim the size of the pool and release the memory of needless direct byte buffer
+ */
+ private void trimTask() {
+ synchronized (walByteBufferPool) {
+ int expectedSize =
+ (workSequenceTsFileProcessors.size() + workUnsequenceTsFileProcessors.size()) * 2;
+ while (expectedSize < currentWalPoolSize && !walByteBufferPool.isEmpty()) {
+ MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
+ MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
+ currentWalPoolSize -= 2;
+ }
+ }
+ }
+
public StorageGroupProcessor(String systemDir, String storageGroupName,
TsFileFlushPolicy fileFlushPolicy) throws StorageGroupProcessorException {
this.storageGroupName = storageGroupName;
@@ -277,6 +357,9 @@ public class StorageGroupProcessor {
this.tsFileManagement = IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
.getTsFileManagement(storageGroupName, storageGroupSysDir.getAbsolutePath());
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ executorService.scheduleWithFixedDelay(this::trimTask, DEFAULT_POOL_TRIM_INTERVAL_MILLIS,
+ DEFAULT_POOL_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
recover();
}
@@ -580,7 +663,7 @@ public class StorageGroupProcessor {
try {
// this tsfile is not zero level, no need to perform redo wal
if (LevelCompactionTsFileManagement.getMergeLevel(tsFileResource.getTsFile()) > 0) {
- writer = recoverPerformer.recover(false);
+ writer = recoverPerformer.recover(false, this::getWalDirectByteBuffer, this::releaseWalBuffer);
if (writer.hasCrashed()) {
tsFileManagement.addRecover(tsFileResource, isSeq);
} else {
@@ -589,7 +672,7 @@ public class StorageGroupProcessor {
}
continue;
} else {
- writer = recoverPerformer.recover(true);
+ writer = recoverPerformer.recover(true, this::getWalDirectByteBuffer, this::releaseWalBuffer);
}
} catch (StorageGroupProcessorException e) {
logger.warn("Skip TsFile: {} because of error in recover: ", tsFileResource.getTsFilePath(),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 5b404c9..c632782 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -177,8 +177,7 @@ public class TsFileProcessor {
if (enableMemControl) {
workMemTable = new PrimitiveMemTable(enableMemControl);
MemTableManager.getInstance().addMemtableNumber();
- }
- else {
+ } else {
workMemTable = MemTableManager.getInstance().getAvailableMemTable(storageGroupName);
}
}
@@ -226,8 +225,7 @@ public class TsFileProcessor {
if (enableMemControl) {
workMemTable = new PrimitiveMemTable(enableMemControl);
MemTableManager.getInstance().addMemtableNumber();
- }
- else {
+ } else {
workMemTable = MemTableManager.getInstance().getAvailableMemTable(storageGroupName);
}
}
@@ -871,7 +869,8 @@ public class TsFileProcessor {
public WriteLogNode getLogNode() {
if (logNode == null) {
logNode = MultiFileLogNodeManager.getInstance()
- .getNode(storageGroupName + "-" + tsFileResource.getTsFile().getName());
+ .getNode(storageGroupName + "-" + tsFileResource.getTsFile().getName(),
+ storageGroupInfo.getWalSupplier());
}
return logNode;
}
@@ -881,7 +880,8 @@ public class TsFileProcessor {
// when closing resource file, its corresponding mod file is also closed.
tsFileResource.close();
MultiFileLogNodeManager.getInstance()
- .deleteNode(storageGroupName + "-" + tsFileResource.getTsFile().getName());
+ .deleteNode(storageGroupName + "-" + tsFileResource.getTsFile().getName(),
+ storageGroupInfo.getWalConsumer());
} catch (IOException e) {
throw new TsFileProcessorException(e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java b/server/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java
new file mode 100644
index 0000000..8ca1cc6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+public class MmapUtil {
+
+ public static void clean(MappedByteBuffer mappedByteBuffer) {
+ if (mappedByteBuffer == null || !mappedByteBuffer.isDirect() || mappedByteBuffer.capacity()== 0)
+ return;
+ invoke(invoke(viewed(mappedByteBuffer), "cleaner"), "clean");
+ }
+
+ private static Object invoke(final Object target, final String methodName, final Class<?>... args) {
+ return AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
+ try {
+ Method method = method(target, methodName, args);
+ method.setAccessible(true);
+ return method.invoke(target);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ }
+
+ private static Method method(Object target, String methodName, Class<?>[] args)
+ throws NoSuchMethodException {
+ try {
+ return target.getClass().getMethod(methodName, args);
+ } catch (NoSuchMethodException e) {
+ return target.getClass().getDeclaredMethod(methodName, args);
+ }
+ }
+
+ private static ByteBuffer viewed(ByteBuffer buffer) {
+ String methodName = "viewedBuffer";
+ Method[] methods = buffer.getClass().getMethods();
+ for (Method method : methods) {
+ if (method.getName().equals("attachment")) {
+ methodName = "attachment";
+ break;
+ }
+ }
+ ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName);
+ if (viewedBuffer == null)
+ return buffer;
+ else
+ return viewed(viewedBuffer);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
index f07d69f..00b9c44 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
@@ -19,11 +19,14 @@
package org.apache.iotdb.db.writelog.manager;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
@@ -35,18 +38,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * MultiFileLogNodeManager manages all ExclusiveWriteLogNodes, each manages WALs of a TsFile
- * (either seq or unseq).
+ * MultiFileLogNodeManager manages all ExclusiveWriteLogNodes, each manages WALs of a TsFile (either
+ * seq or unseq).
*/
public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
private static final Logger logger = LoggerFactory.getLogger(MultiFileLogNodeManager.class);
- private Map<String, WriteLogNode> nodeMap;
+ private final Map<String, WriteLogNode> nodeMap;
private ScheduledExecutorService executorService;
- private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private final void forceTask() {
+ private void forceTask() {
if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
logger.warn("system mode is read-only, the force flush WAL task is stopped");
return;
@@ -75,23 +78,16 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
@Override
- public WriteLogNode getNode(String identifier) {
- WriteLogNode node = nodeMap.get(identifier);
- if (node == null) {
- node = new ExclusiveWriteLogNode(identifier);
- WriteLogNode oldNode = nodeMap.putIfAbsent(identifier, node);
- if (oldNode != null) {
- return oldNode;
- }
- }
- return node;
+ public WriteLogNode getNode(String identifier, Supplier<ByteBuffer[]> supplier) {
+ return nodeMap
+ .computeIfAbsent(identifier, key -> new ExclusiveWriteLogNode(key, supplier.get()));
}
@Override
- public void deleteNode(String identifier) throws IOException {
+ public void deleteNode(String identifier, Consumer<ByteBuffer[]> consumer) throws IOException {
WriteLogNode node = nodeMap.remove(identifier);
if (node != null) {
- node.delete();
+ consumer.accept(node.delete());
}
}
@@ -148,9 +144,11 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
}
private static class InstanceHolder {
- private InstanceHolder(){}
- private static MultiFileLogNodeManager instance = new MultiFileLogNodeManager();
+ private InstanceHolder() {
+ }
+
+ private static final MultiFileLogNodeManager instance = new MultiFileLogNodeManager();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java b/server/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
index 84d02fc..f46d57a 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.writelog.manager;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
/**
@@ -33,14 +36,14 @@ public interface WriteLogNodeManager {
* @param identifier -identifier, the format: "{storageGroupName}-{BufferWrite/Overflow}-{
* nameOfTsFile}"
*/
- WriteLogNode getNode(String identifier);
+ WriteLogNode getNode(String identifier, Supplier<ByteBuffer[]> supplier);
/**
* Delete a log node. If the log node does not exist, this will be an empty operation.
*
* @param identifier -identifier
*/
- void deleteNode(String identifier) throws IOException;
+ void deleteNode(String identifier, Consumer<ByteBuffer[]> consumer) throws IOException;
/**
* Close all nodes.
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index f241db5..6c73713 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -59,10 +59,8 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private ByteBuffer logBufferWorking = ByteBuffer
- .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
- private ByteBuffer logBufferIdle = ByteBuffer
- .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ private ByteBuffer logBufferWorking;
+ private ByteBuffer logBufferIdle;
private ByteBuffer logBufferFlushing;
private final Object switchBufferCondition = new Object();
@@ -83,10 +81,12 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
*
* @param identifier ExclusiveWriteLogNode identifier
*/
- public ExclusiveWriteLogNode(String identifier) {
+ public ExclusiveWriteLogNode(String identifier, ByteBuffer[] byteBuffers) {
this.identifier = identifier;
this.logDirectory =
DirectoryManager.getInstance().getWALFolder() + File.separator + this.identifier;
+ this.logBufferWorking = byteBuffers[0];
+ this.logBufferIdle = byteBuffers[1];
if (SystemFileFactory.INSTANCE.getFile(logDirectory).mkdirs()) {
logger.info("create the WAL folder {}.", logDirectory);
}
@@ -197,12 +197,24 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
}
@Override
- public void delete() throws IOException {
+ public ByteBuffer[] delete() throws IOException {
lock.lock();
try {
close();
FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(logDirectory));
deleted = true;
+ ByteBuffer[] res = new ByteBuffer[2];
+ int index = 0;
+ if (logBufferWorking != null) {
+ res[index++] = logBufferWorking;
+ }
+ if (logBufferIdle != null) {
+ res[index++] = logBufferIdle;
+ }
+ if (logBufferFlushing != null) {
+ res[index] = logBufferFlushing;
+ }
+ return res;
} finally {
lock.unlock();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
index a93117b..4ca048f 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.writelog.node;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.writelog.io.ILogReader;
@@ -76,7 +77,7 @@ public interface WriteLogNode {
* Abandon all logs in this node and delete the log directory. Calling insert() after calling
* this method is undefined.
*/
- void delete() throws IOException;
+ ByteBuffer[] delete() throws IOException;
/**
* return an ILogReader which can iterate each log in this log node.
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 4be75cc..2426414 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -20,9 +20,11 @@
package org.apache.iotdb.db.writelog.recover;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
@@ -85,9 +87,10 @@ public class LogReplayer {
* finds the logNode of the TsFile given by insertFilePath and logNodePrefix, reads the WALs from
* the logNode and redoes them into a given MemTable and ModificationFile.
*/
- public void replayLogs() {
- WriteLogNode logNode = MultiFileLogNodeManager.getInstance().getNode(
- logNodePrefix + FSFactoryProducer.getFSFactory().getFile(insertFilePath).getName());
+ public void replayLogs(Supplier<ByteBuffer[]> supplier) {
+ WriteLogNode logNode = MultiFileLogNodeManager.getInstance()
+ .getNode(logNodePrefix + FSFactoryProducer.getFSFactory().getFile(insertFilePath).getName(),
+ supplier);
ILogReader logReader = logNode.getLogReader();
try {
@@ -171,7 +174,8 @@ public class LogReplayer {
if (plan instanceof InsertRowPlan) {
recoverMemTable.insert((InsertRowPlan) plan);
} else {
- recoverMemTable.insertTablet((InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
+ recoverMemTable
+ .insertTablet((InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 8120d00..3418847 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -23,10 +23,13 @@ import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SU
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -81,8 +84,8 @@ public class TsFileRecoverPerformer {
* writing
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
- public RestorableTsFileIOWriter recover(boolean needRedoWal)
- throws StorageGroupProcessorException {
+ public RestorableTsFileIOWriter recover(boolean needRedoWal, Supplier<ByteBuffer[]> supplier,
+ Consumer<ByteBuffer[]> consumer) throws StorageGroupProcessorException {
File file = FSFactoryProducer.getFSFactory().getFile(filePath);
if (!file.exists()) {
@@ -120,12 +123,13 @@ public class TsFileRecoverPerformer {
// redo logs
if (needRedoWal) {
- redoLogs(restorableTsFileIOWriter);
+ redoLogs(restorableTsFileIOWriter, supplier);
// clean logs
try {
MultiFileLogNodeManager.getInstance()
- .deleteNode(logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
+ .deleteNode(logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName(),
+ consumer);
} catch (IOException e) {
throw new StorageGroupProcessorException(e);
}
@@ -196,13 +200,13 @@ public class TsFileRecoverPerformer {
tsFileResource.updatePlanIndexes(restorableTsFileIOWriter.getMaxPlanIndex());
}
- private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter)
+ private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter, Supplier<ByteBuffer[]> supplier)
throws StorageGroupProcessorException {
IMemTable recoverMemTable = new PrimitiveMemTable();
recoverMemTable.setVersion(versionController.nextVersion());
LogReplayer logReplayer = new LogReplayer(logNodePrefix, filePath, tsFileResource.getModFile(),
versionController, tsFileResource, recoverMemTable, sequence);
- logReplayer.replayLogs();
+ logReplayer.replayLogs(supplier);
try {
if (!recoverMemTable.isEmpty()) {
// flush logs
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java
index 2a7a5ed..8386977 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java
@@ -20,12 +20,15 @@
package org.apache.iotdb.db.writelog;
import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
@@ -57,7 +60,7 @@ public class IoTDBLogFileSizeTest {
return;
}
groupSize = TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte();
- TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte( 8 * 1024 * 1024);
+ TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(8 * 1024 * 1024);
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(8 * 1024 * 1024);
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.envSetUp();
@@ -81,6 +84,11 @@ public class IoTDBLogFileSizeTest {
return;
}
final long[] maxLength = {0};
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
Thread writeThread = new Thread(() -> {
int cnt = 0;
try {
@@ -100,7 +108,7 @@ public class IoTDBLogFileSizeTest {
cnt);
statement.execute(sql);
WriteLogNode logNode = MultiFileLogNodeManager.getInstance().getNode(
- "root.logFileTest.seq" + IoTDBConstant.SEQFILE_LOG_NODE_SUFFIX);
+ "root.logFileTest.seq" + IoTDBConstant.SEQFILE_LOG_NODE_SUFFIX, () -> buffers);
File bufferWriteWALFile = new File(
logNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME);
if (bufferWriteWALFile.exists() && bufferWriteWALFile.length() > maxLength[0]) {
@@ -117,6 +125,9 @@ public class IoTDBLogFileSizeTest {
while (writeThread.isAlive()) {
}
+ for (ByteBuffer byteBuffer : buffers) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
}
@Test
@@ -124,6 +135,11 @@ public class IoTDBLogFileSizeTest {
if (skip) {
return;
}
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
final long[] maxLength = {0};
Thread writeThread = new Thread(() -> {
int cnt = 0;
@@ -143,7 +159,7 @@ public class IoTDBLogFileSizeTest {
++cnt, cnt);
statement.execute(sql);
WriteLogNode logNode = MultiFileLogNodeManager.getInstance()
- .getNode("root.logFileTest.unsequence" + IoTDBConstant.UNSEQFILE_LOG_NODE_SUFFIX);
+ .getNode("root.logFileTest.unsequence" + IoTDBConstant.UNSEQFILE_LOG_NODE_SUFFIX, () -> buffers);
File WALFile = new File(
logNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME);
if (WALFile.exists() && WALFile.length() > maxLength[0]) {
@@ -160,6 +176,9 @@ public class IoTDBLogFileSizeTest {
while (writeThread.isAlive()) {
}
+ for (ByteBuffer byteBuffer : buffers) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
}
private void executeSQL(String[] sqls) throws ClassNotFoundException {
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
index 1d6e918..cc62db0 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
@@ -20,6 +20,8 @@ package org.apache.iotdb.db.writelog;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.util.Collections;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -31,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -82,13 +85,20 @@ public class PerformanceTest {
tempRestore.createNewFile();
tempProcessorStore.createNewFile();
- WriteLogNode logNode = new ExclusiveWriteLogNode("root.testLogNode");
+ ByteBuffer[] byteBuffers = new ByteBuffer[2];
+ byteBuffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ byteBuffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+
+ WriteLogNode logNode = new ExclusiveWriteLogNode("root.testLogNode", byteBuffers);
long time = System.currentTimeMillis();
for (int i = 0; i < 1000000; i++) {
InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("logTestDevice"), 100,
new String[]{"s1", "s2", "s3", "s4"},
- new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
+ new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT,
+ TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50,
new PartialPath("root.logTestDevice.s1"));
@@ -102,7 +112,10 @@ public class PerformanceTest {
3000000 + " logs use " + (System.currentTimeMillis() - time) + " ms at batch size "
+ config.getFlushWalThreshold());
- logNode.delete();
+ ByteBuffer[] array = logNode.delete();
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
tempRestore.delete();
tempProcessorStore.delete();
tempRestore.getParentFile().delete();
@@ -130,25 +143,37 @@ public class PerformanceTest {
} catch (MetadataException ignored) {
}
IoTDB.metaManager
- .createTimeseries(new PartialPath("root.logTestDevice.s1"), TSDataType.DOUBLE, TSEncoding.PLAIN,
+ .createTimeseries(new PartialPath("root.logTestDevice.s1"), TSDataType.DOUBLE,
+ TSEncoding.PLAIN,
TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap());
IoTDB.metaManager
- .createTimeseries(new PartialPath("root.logTestDevice.s2"), TSDataType.INT32, TSEncoding.PLAIN,
+ .createTimeseries(new PartialPath("root.logTestDevice.s2"), TSDataType.INT32,
+ TSEncoding.PLAIN,
TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap());
IoTDB.metaManager
- .createTimeseries(new PartialPath("root.logTestDevice.s3"), TSDataType.TEXT, TSEncoding.PLAIN,
+ .createTimeseries(new PartialPath("root.logTestDevice.s3"), TSDataType.TEXT,
+ TSEncoding.PLAIN,
TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap());
IoTDB.metaManager
- .createTimeseries(new PartialPath("root.logTestDevice.s4"), TSDataType.BOOLEAN, TSEncoding.PLAIN,
+ .createTimeseries(new PartialPath("root.logTestDevice.s4"), TSDataType.BOOLEAN,
+ TSEncoding.PLAIN,
TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap());
- WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice");
+
+ ByteBuffer[] byteBuffers = new ByteBuffer[2];
+ byteBuffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ byteBuffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", byteBuffers);
for (int i = 0; i < 1000000; i++) {
InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("root.logTestDevice"), 100,
new String[]{"s1", "s2", "s3", "s4"},
- new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
+ new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT,
+ TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
- DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new PartialPath("root.logTestDevice.s1"));
+ DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50,
+ new PartialPath("root.logTestDevice.s1"));
logNode.write(bwInsertPlan);
logNode.write(deletePlan);
@@ -159,7 +184,10 @@ public class PerformanceTest {
System.out.println(
3000000 + " logs use " + (System.currentTimeMillis() - time) + "ms when recovering ");
} finally {
- logNode.delete();
+ ByteBuffer[] array = logNode.delete();
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
tempRestore.delete();
tempProcessorStore.delete();
tempRestore.getParentFile().delete();
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
index 332ee6b..cdff9cf 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
@@ -19,12 +19,15 @@
package org.apache.iotdb.db.writelog;
import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertNotSame;
import static junit.framework.TestCase.assertSame;
import static junit.framework.TestCase.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -32,11 +35,11 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -64,15 +67,45 @@ public class WriteLogNodeManagerTest {
public void testGetAndDelete() throws IOException {
String identifier = "testLogNode";
WriteLogNodeManager manager = MultiFileLogNodeManager.getInstance();
- WriteLogNode logNode = manager.getNode(identifier);
+ WriteLogNode logNode = manager.getNode(identifier, () -> {
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return buffers;
+ });
assertEquals(identifier, logNode.getIdentifier());
- WriteLogNode theSameNode = manager.getNode(identifier);
+ WriteLogNode theSameNode = manager.getNode(identifier, () -> {
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return buffers;
+ });
assertSame(logNode, theSameNode);
- manager.deleteNode(identifier);
- WriteLogNode anotherNode = manager.getNode(identifier);
+ manager.deleteNode(identifier, (ByteBuffer[] array) -> {
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
+ });
+ WriteLogNode anotherNode = manager.getNode(identifier, () -> {
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return buffers;
+ });
assertNotSame(logNode, anotherNode);
+ manager.deleteNode(identifier, (ByteBuffer[] array) -> {
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
+ });
}
@Test
@@ -84,17 +117,24 @@ public class WriteLogNodeManagerTest {
File tempProcessorStore = File.createTempFile("managerTest", "processorStore");
WriteLogNodeManager manager = MultiFileLogNodeManager.getInstance();
- WriteLogNode logNode = manager
- .getNode("root.managerTest");
+ WriteLogNode logNode = manager.getNode("root.managerTest", () -> {
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return buffers;
+ });
InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("logTestDevice"), 100,
new String[]{"s1", "s2", "s3", "s4"},
new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
- DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new PartialPath("root.logTestDevice.s1"));
+ DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50,
+ new PartialPath("root.logTestDevice.s1"));
File walFile = new File(logNode.getLogDirectory() + File.separator + "wal1");
- assertTrue(!walFile.exists());
+ assertFalse(walFile.exists());
logNode.write(bwInsertPlan);
logNode.write(deletePlan);
@@ -102,7 +142,10 @@ public class WriteLogNodeManagerTest {
Thread.sleep(config.getForceWalPeriodInMs() + 1000);
assertTrue(walFile.exists());
- logNode.delete();
+ ByteBuffer[] buffers = logNode.delete();
+ for (ByteBuffer byteBuffer : buffers) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
config.setForceWalPeriodInMs(flushWalPeriod);
tempRestore.delete();
tempProcessorStore.delete();
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index fe5610c..eadeb38 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -24,6 +24,8 @@ import static junit.framework.TestCase.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -34,6 +36,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.iotdb.db.writelog.io.ILogReader;
import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
@@ -68,7 +71,12 @@ public class WriteLogNodeTest {
// then reads the logs from file
String identifier = "root.logTestDevice";
- WriteLogNode logNode = new ExclusiveWriteLogNode(identifier);
+ ByteBuffer[] byteBuffers = new ByteBuffer[2];
+ byteBuffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ byteBuffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ WriteLogNode logNode = new ExclusiveWriteLogNode(identifier, byteBuffers);
InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath(identifier), 100,
new String[]{"s1", "s2", "s3", "s4"},
@@ -96,7 +104,7 @@ public class WriteLogNodeTest {
}
InsertTabletPlan tabletPlan = new InsertTabletPlan(new PartialPath(identifier),
- new String[]{"s1", "s2", "s3", "s4"}, dataTypes);
+ new String[]{"s1", "s2", "s3", "s4"}, dataTypes);
tabletPlan.setTimes(times);
tabletPlan.setColumns(columns);
tabletPlan.setRowCount(times.length);
@@ -122,7 +130,10 @@ public class WriteLogNodeTest {
assertEquals(newPlan.getMeasurements().length, 3);
reader.close();
- logNode.delete();
+ ByteBuffer[] array = logNode.delete();
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
}
@Test
@@ -131,7 +142,12 @@ public class WriteLogNodeTest {
// then calls notifyStartFlush() and notifyEndFlush() to delete old file
String identifier = "root.logTestDevice";
- WriteLogNode logNode = new ExclusiveWriteLogNode(identifier);
+ ByteBuffer[] byteBuffers = new ByteBuffer[2];
+ byteBuffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ byteBuffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ WriteLogNode logNode = new ExclusiveWriteLogNode(identifier, byteBuffers);
InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath(identifier), 100,
new String[]{"s1", "s2", "s3", "s4"},
@@ -159,7 +175,10 @@ public class WriteLogNodeTest {
assertFalse(logReader.hasNext());
logReader.close();
- logNode.delete();
+ ByteBuffer[] array = logNode.delete();
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
}
@Test
@@ -168,13 +187,19 @@ public class WriteLogNodeTest {
int flushWalThreshold = config.getFlushWalThreshold();
config.setFlushWalThreshold(2);
- WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice");
+ ByteBuffer[] byteBuffers = new ByteBuffer[2];
+ byteBuffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ byteBuffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", byteBuffers);
InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("root.logTestDevice"), 100,
new String[]{"s1", "s2", "s3", "s4"},
new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
- DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new PartialPath("root.logTestDevice.s1"));
+ DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50,
+ new PartialPath("root.logTestDevice.s1"));
logNode.write(bwInsertPlan);
@@ -189,7 +214,10 @@ public class WriteLogNodeTest {
}
assertTrue(walFile.exists());
- logNode.delete();
+ ByteBuffer[] array = logNode.delete();
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
config.setFlushWalThreshold(flushWalThreshold);
}
@@ -198,13 +226,19 @@ public class WriteLogNodeTest {
// this test uses a dummy insert log node to insert a few logs and flushes them
// then deletes the node
- WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice");
+ ByteBuffer[] byteBuffers = new ByteBuffer[2];
+ byteBuffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ byteBuffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", byteBuffers);
InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("logTestDevice"), 100,
new String[]{"s1", "s2", "s3", "s4"},
new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
- DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new PartialPath("root.logTestDevice.s1"));
+ DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50,
+ new PartialPath("root.logTestDevice.s1"));
logNode.write(bwInsertPlan);
logNode.write(deletePlan);
@@ -219,16 +253,25 @@ public class WriteLogNodeTest {
}
assertTrue(new File(logNode.getLogDirectory()).exists());
- logNode.delete();
- assertTrue(!new File(logNode.getLogDirectory()).exists());
+ ByteBuffer[] array = logNode.delete();
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
+ assertFalse(new File(logNode.getLogDirectory()).exists());
}
@Test
public void testOverSizedWAL() throws IOException, IllegalPathException {
// this test uses a dummy insert log node to insert an over-sized log and assert exception caught
- WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice.oversize");
-
- InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("root.logTestDevice.oversize"), 100,
+ ByteBuffer[] byteBuffers = new ByteBuffer[2];
+ byteBuffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ byteBuffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice.oversize", byteBuffers);
+
+ InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("root.logTestDevice.oversize"),
+ 100,
new String[]{"s1", "s2", "s3", "s4"},
new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
new String[]{"1.0", "15", new String(new char[65 * 1024 * 1024]), "false"});
@@ -241,6 +284,9 @@ public class WriteLogNodeTest {
}
assertTrue(caught);
- logNode.delete();
+ ByteBuffer[] array = logNode.delete();
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index f7a5c63..a64c754 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -26,9 +26,12 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
@@ -50,6 +53,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -109,7 +113,12 @@ public class LogReplayerTest {
versionController, tsFileResource, memTable, false);
WriteLogNode node =
- MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsFile.getName());
+ MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsFile.getName(), () -> {
+ ByteBuffer[] byteBuffers = new ByteBuffer[2];
+ byteBuffers[0] = ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ byteBuffers[1] = ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return byteBuffers;
+ });
node.write(
new InsertRowPlan(new PartialPath("root.sg.device0"), 100, "sensor0", TSDataType.INT64,
String.valueOf(0)));
@@ -124,7 +133,12 @@ public class LogReplayerTest {
node.write(deletePlan);
node.close();
- replayer.replayLogs();
+ replayer.replayLogs(() -> {
+ ByteBuffer[] byteBuffers = new ByteBuffer[2];
+ byteBuffers[0] = ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ byteBuffers[1] = ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return byteBuffers;
+ });
for (int i = 0; i < 5; i++) {
ReadOnlyMemChunk memChunk = memTable
@@ -175,7 +189,11 @@ public class LogReplayerTest {
}
} finally {
modFile.close();
- MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + tsFile.getName());
+ MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + tsFile.getName(), (ByteBuffer[] byteBuffers) -> {
+ for (ByteBuffer byteBuffer : byteBuffers) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
+ });
modF.delete();
tsFile.delete();
tsFile.getParentFile().delete();
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
index 94d4feb..3a0bbe6 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
@@ -21,12 +21,15 @@ package org.apache.iotdb.db.writelog.recover;
import static org.junit.Assert.assertEquals;
-import java.util.*;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Collections;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -38,6 +41,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -87,7 +91,8 @@ public class RecoverResourceFromReaderTest {
schema = new Schema();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
- PartialPath path = new PartialPath("root.sg.device" + i + IoTDBConstant.PATH_SEPARATOR + "sensor" + j);
+ PartialPath path = new PartialPath(
+ "root.sg.device" + i + IoTDBConstant.PATH_SEPARATOR + "sensor" + j);
MeasurementSchema measurementSchema = new MeasurementSchema("sensor" + j, TSDataType.INT64,
TSEncoding.PLAIN);
schema.registerTimeseries(path.toTSFilePath(), measurementSchema);
@@ -99,17 +104,20 @@ public class RecoverResourceFromReaderTest {
schema.registerTimeseries(new Path(("root.sg.device99"), ("sensor4")),
new MeasurementSchema("sensor4", TSDataType.INT64, TSEncoding.PLAIN));
IoTDB.metaManager
- .createTimeseries(new PartialPath("root.sg.device99.sensor4"), TSDataType.INT64, TSEncoding.PLAIN,
+ .createTimeseries(new PartialPath("root.sg.device99.sensor4"), TSDataType.INT64,
+ TSEncoding.PLAIN,
TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap());
schema.registerTimeseries(new Path(("root.sg.device99"), ("sensor2")),
new MeasurementSchema("sensor2", TSDataType.INT64, TSEncoding.PLAIN));
IoTDB.metaManager
- .createTimeseries(new PartialPath("root.sg.device99.sensor2"), TSDataType.INT64, TSEncoding.PLAIN,
+ .createTimeseries(new PartialPath("root.sg.device99.sensor2"), TSDataType.INT64,
+ TSEncoding.PLAIN,
TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap());
schema.registerTimeseries(new Path(("root.sg.device99"), ("sensor1")),
new MeasurementSchema("sensor1", TSDataType.INT64, TSEncoding.PLAIN));
IoTDB.metaManager
- .createTimeseries(new PartialPath("root.sg.device99.sensor1"), TSDataType.INT64, TSEncoding.PLAIN,
+ .createTimeseries(new PartialPath("root.sg.device99.sensor1"), TSDataType.INT64,
+ TSEncoding.PLAIN,
TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap());
writer = new TsFileWriter(tsF, schema);
@@ -134,7 +142,14 @@ public class RecoverResourceFromReaderTest {
writer.flushAllChunkGroups();
writer.getIOWriter().close();
- node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName());
+ node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName(), () -> {
+ ByteBuffer[] byteBuffers = new ByteBuffer[2];
+ byteBuffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ byteBuffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return byteBuffers;
+ });
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
String[] measurements = new String[10];
@@ -145,16 +160,19 @@ public class RecoverResourceFromReaderTest {
types[k] = TSDataType.INT64;
values[k] = String.valueOf(k + 10);
}
- InsertRowPlan insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device" + j), i, measurements,
+ InsertRowPlan insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device" + j), i,
+ measurements,
types, values);
node.write(insertRowPlan);
}
node.notifyStartFlush();
}
- InsertRowPlan insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device99"), 1, new String[]{"sensor4"},
+ InsertRowPlan insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device99"), 1,
+ new String[]{"sensor4"},
new TSDataType[]{TSDataType.INT64}, new String[]{"4"});
node.write(insertRowPlan);
- insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device99"), 300, new String[]{"sensor2"},
+ insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device99"), 300,
+ new String[]{"sensor2"},
new TSDataType[]{TSDataType.INT64}, new String[]{"2"});
node.write(insertRowPlan);
node.close();
@@ -167,7 +185,10 @@ public class RecoverResourceFromReaderTest {
EnvironmentUtils.cleanEnv();
FileUtils.deleteDirectory(tsF.getParentFile());
resource.close();
- node.delete();
+ ByteBuffer[] array = node.delete();
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
}
@Test
@@ -183,7 +204,18 @@ public class RecoverResourceFromReaderTest {
TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController,
resource, false, false);
- performer.recover(true).close();
+ performer.recover(true, () -> {
+ ByteBuffer[] byteBuffers = new ByteBuffer[2];
+ byteBuffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ byteBuffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return byteBuffers;
+ }, (ByteBuffer[] byteBuffers) -> {
+ for (ByteBuffer byteBuffer : byteBuffers) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
+ }).close();
assertEquals(1, resource.getStartTime("root.sg.device99"));
assertEquals(300, resource.getEndTime("root.sg.device99"));
for (int i = 0; i < 10; i++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index f4ff7cf..5c3ea2d 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -21,16 +21,18 @@ package org.apache.iotdb.db.writelog.recover;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -42,6 +44,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -99,7 +102,8 @@ public class SeqTsFileRecoverTest {
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
IoTDB.metaManager
- .createTimeseries(new PartialPath("root.sg.device" + i + ".sensor" + j), TSDataType.INT64,
+ .createTimeseries(new PartialPath("root.sg.device" + i + ".sensor" + j),
+ TSDataType.INT64,
TSEncoding.PLAIN, TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
@@ -138,7 +142,14 @@ public class SeqTsFileRecoverTest {
writer.flushAllChunkGroups();
writer.getIOWriter().close();
- node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName());
+ node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName(), () -> {
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return buffers;
+ });
for (int i = 10; i < 20; i++) {
for (int j = 0; j < 10; j++) {
String[] measurements = new String[10];
@@ -149,7 +160,8 @@ public class SeqTsFileRecoverTest {
types[k] = TSDataType.INT64;
values[k] = String.valueOf(k);
}
- InsertRowPlan insertPlan = new InsertRowPlan(new PartialPath("root.sg.device" + j), i, measurements, types,
+ InsertRowPlan insertPlan = new InsertRowPlan(new PartialPath("root.sg.device" + j), i,
+ measurements, types,
values);
node.write(insertPlan);
}
@@ -164,14 +176,28 @@ public class SeqTsFileRecoverTest {
EnvironmentUtils.cleanEnv();
FileUtils.deleteDirectory(tsF.getParentFile());
resource.close();
- node.delete();
+ ByteBuffer[] buffers = node.delete();
+ for (ByteBuffer byteBuffer : buffers) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
}
@Test
public void testNonLastRecovery() throws StorageGroupProcessorException, IOException {
TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController,
resource, false, false);
- RestorableTsFileIOWriter writer = performer.recover(true);
+ RestorableTsFileIOWriter writer = performer.recover(true, () -> {
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return buffers;
+ }, (ByteBuffer[] array) -> {
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
+ });
assertFalse(writer.canWrite());
writer.close();
@@ -220,7 +246,18 @@ public class SeqTsFileRecoverTest {
public void testLastRecovery() throws StorageGroupProcessorException, IOException {
TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController,
resource, false, true);
- RestorableTsFileIOWriter writer = performer.recover(true);
+ RestorableTsFileIOWriter writer = performer.recover(true, () -> {
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return buffers;
+ }, (ByteBuffer[] array) -> {
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
+ });
writer.makeMetadataVisible();
assertEquals(11, writer.getMetadatasForQuery().size());
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 8caa93a..2d4562f 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -23,9 +23,12 @@ import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.util.Collections;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -39,6 +42,7 @@ import org.apache.iotdb.db.query.reader.chunk.ChunkDataIterator;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -142,7 +146,14 @@ public class UnseqTsFileRecoverTest {
writer.flushAllChunkGroups();
writer.getIOWriter().close();
- node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName());
+ node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName(), () -> {
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return buffers;
+ });
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
String[] measurements = new String[10];
@@ -172,7 +183,10 @@ public class UnseqTsFileRecoverTest {
public void tearDown() throws IOException, StorageEngineException {
FileUtils.deleteDirectory(tsF.getParentFile());
resource.close();
- node.delete();
+ ByteBuffer[] array = node.delete();
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
EnvironmentUtils.cleanEnv();
}
@@ -180,7 +194,18 @@ public class UnseqTsFileRecoverTest {
public void test() throws StorageGroupProcessorException, IOException {
TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix,
versionController, resource, false, false);
- performer.recover(true).close();
+ performer.recover(true, () -> {
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] = ByteBuffer
+ .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return buffers;
+ }, (ByteBuffer[] array) -> {
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
+ }).close();
assertEquals(1, resource.getStartTime("root.sg.device99"));
assertEquals(300, resource.getEndTime("root.sg.device99"));