You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/26 01:16:54 UTC
[iotdb] branch master updated: [IOTDB-5925] Pipe: WAL Resource Management (#9948)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 94212b64c74 [IOTDB-5925] Pipe: WAL Resource Management (#9948)
94212b64c74 is described below
commit 94212b64c74d94e268ed71ad7ac13e60cfcc36d1
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Fri May 26 09:16:48 2023 +0800
[IOTDB-5925] Pipe: WAL Resource Management (#9948)
---
.../iotdb/commons/concurrent/ThreadName.java | 1 +
.../db/engine/storagegroup/TsFileProcessor.java | 33 +++--
.../listener/PipeInsertionDataNodeListener.java | 10 +-
.../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 3 +-
.../core/event/impl/PipeTabletInsertionEvent.java | 53 ++++---
.../realtime/PipeRealtimeCollectEventFactory.java | 5 +-
.../db/pipe/resource/PipeResourceManager.java | 9 ++
.../{ => file}/PipeFileResourceManager.java | 2 +-
.../db/pipe/resource/wal/PipeWALResource.java | 162 +++++++++++++++++++++
.../pipe/resource/wal/PipeWALResourceManager.java | 111 ++++++++++++++
.../db/pipe/task/subtask/PipeConnectorSubtask.java | 9 +-
.../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 8 +-
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 4 +-
.../{WALPipeHandler.java => WALEntryHandler.java} | 22 ++-
.../db/wal/utils/listener/WALFlushListener.java | 10 +-
.../core/collector/PipeRealtimeCollectTest.java | 6 +
.../pipe/resource/PipeFileResourceManagerTest.java | 1 +
...peHandlerTest.java => WALEntryHandlerTest.java} | 18 +--
.../iotdb/db/wal/utils/WALInsertNodeCacheTest.java | 8 +-
19 files changed, 409 insertions(+), 66 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index d156287e1f3..718c5a6bfc1 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -72,6 +72,7 @@ public enum ThreadName {
PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
PIPE_META_SYNC_SERVICE("Pipe-Meta-Sync-Service"),
+ PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE("Pipe-WAL-Resource-TTL-Checker-Service"),
;
private final String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 55f06629258..f89f28f9b2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -258,8 +258,9 @@ public class TsFileProcessor {
}
long startTime = System.nanoTime();
+ WALFlushListener walFlushListener;
try {
- WALFlushListener walFlushListener = walNode.log(workMemTable.getMemTableId(), insertRowNode);
+ walFlushListener = walNode.log(workMemTable.getMemTableId(), insertRowNode);
if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
throw walFlushListener.getCause();
}
@@ -277,17 +278,20 @@ public class TsFileProcessor {
}
startTime = System.nanoTime();
+
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToInsertNode(
+ dataRegionInfo.getDataRegion().getDataRegionId(),
+ walFlushListener.getWalEntryHandler(),
+ insertRowNode,
+ tsFileResource);
+
if (insertRowNode.isAligned()) {
workMemTable.insertAlignedRow(insertRowNode);
} else {
workMemTable.insert(insertRowNode);
}
- // collect plan node in pipe
- PipeInsertionDataNodeListener.getInstance()
- .listenToInsertNode(
- dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, tsFileResource);
-
// update start time of this memtable
tsFileResource.updateStartTime(
insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime());
@@ -358,9 +362,9 @@ public class TsFileProcessor {
}
long startTime = System.nanoTime();
+ WALFlushListener walFlushListener;
try {
- WALFlushListener walFlushListener =
- walNode.log(workMemTable.getMemTableId(), insertTabletNode, start, end);
+ walFlushListener = walNode.log(workMemTable.getMemTableId(), insertTabletNode, start, end);
if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
throw walFlushListener.getCause();
}
@@ -377,6 +381,14 @@ public class TsFileProcessor {
}
startTime = System.nanoTime();
+
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToInsertNode(
+ dataRegionInfo.getDataRegion().getDataRegionId(),
+ walFlushListener.getWalEntryHandler(),
+ insertTabletNode,
+ tsFileResource);
+
try {
if (insertTabletNode.isAligned()) {
workMemTable.insertAlignedTablet(insertTabletNode, start, end);
@@ -393,11 +405,6 @@ public class TsFileProcessor {
results[i] = RpcUtils.SUCCESS_STATUS;
}
- // collect plan node in pipe
- PipeInsertionDataNodeListener.getInstance()
- .listenToInsertNode(
- dataRegionInfo.getDataRegion().getDataRegionId(), insertTabletNode, tsFileResource);
-
tsFileResource.updateStartTime(
insertTabletNode.getDeviceID().toStringID(), insertTabletNode.getTimes()[start]);
// for sequence tsfile, we update the endTime only when the file is prepared to be closed.
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
index 778a583f1c7..d39a2fb9dc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.assigner.PipeDataRegionAssigner;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEventFactory;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -84,9 +85,11 @@ public class PipeInsertionDataNodeListener {
assigner.publishToAssign(PipeRealtimeCollectEventFactory.createCollectEvent(tsFileResource));
}
- // TODO: check whether the method is called on the right place.
public void listenToInsertNode(
- String dataRegionId, InsertNode insertNode, TsFileResource tsFileResource) {
+ String dataRegionId,
+ WALEntryHandler walEntryHandler,
+ InsertNode insertNode,
+ TsFileResource tsFileResource) {
final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
// only events from registered data region will be collected
@@ -95,7 +98,8 @@ public class PipeInsertionDataNodeListener {
}
assigner.publishToAssign(
- PipeRealtimeCollectEventFactory.createCollectEvent(insertNode, tsFileResource));
+ PipeRealtimeCollectEventFactory.createCollectEvent(
+ walEntryHandler, insertNode, tsFileResource));
}
/////////////////////////////// singleton ///////////////////////////////
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 6b414c03b36..65837f31d42 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -130,7 +131,7 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
}
private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent)
- throws PipeException, TException {
+ throws PipeException, TException, WALPipeException {
final TPipeTransferResp resp =
client.pipeTransfer(
PipeTransferInsertNodeReq.toTPipeTransferReq(pipeTabletInsertionEvent.getInsertNode()));
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index 62c599b66c1..99d9da055ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -21,28 +21,32 @@ package org.apache.iotdb.db.pipe.core.event.impl;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.wal.exception.WALPipeException;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedEvent {
- private final InsertNode insertNode;
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletInsertionEvent.class);
- private final AtomicInteger referenceCount;
+ private final WALEntryHandler walEntryHandler;
- public PipeTabletInsertionEvent(InsertNode insertNode) {
- this.insertNode = insertNode;
- this.referenceCount = new AtomicInteger(0);
+ public PipeTabletInsertionEvent(WALEntryHandler walEntryHandler) {
+ this.walEntryHandler = walEntryHandler;
}
- public InsertNode getInsertNode() {
- return insertNode;
+ public InsertNode getInsertNode() throws WALPipeException {
+ return walEntryHandler.getValue();
}
@Override
@@ -62,26 +66,41 @@ public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedE
@Override
public boolean increaseReferenceCount(String holderMessage) {
- // TODO: use WALPipeHandler pinMemtable
- referenceCount.incrementAndGet();
- return true;
+ try {
+ PipeResourceManager.wal().pin(walEntryHandler.getMemTableId(), walEntryHandler);
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn(
+ String.format(
+ "Increase reference count for memtable %d error. Holder Message: %s",
+ walEntryHandler.getMemTableId(), holderMessage),
+ e);
+ return false;
+ }
}
@Override
public boolean decreaseReferenceCount(String holderMessage) {
- // TODO: use WALPipeHandler unpinMemetable
- referenceCount.decrementAndGet();
- return true;
+ try {
+ PipeResourceManager.wal().unpin(walEntryHandler.getMemTableId());
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn(
+ String.format(
+ "Decrease reference count for memtable %d error. Holder Message: %s",
+ walEntryHandler.getMemTableId(), holderMessage),
+ e);
+ return false;
+ }
}
@Override
public int getReferenceCount() {
- // TODO: use WALPipeHandler unpinMemetable
- return referenceCount.get();
+ return PipeResourceManager.wal().getReferenceCount(walEntryHandler.getMemTableId());
}
@Override
public String toString() {
- return "PipeTabletInsertionEvent{" + "insertNode=" + insertNode + '}';
+ return "PipeTabletInsertionEvent{" + "walEntryHandler=" + walEntryHandler + '}';
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index 4c98c5193be..a4b453e4322 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
public class PipeRealtimeCollectEventFactory {
@@ -34,9 +35,9 @@ public class PipeRealtimeCollectEventFactory {
}
public static PipeRealtimeCollectEvent createCollectEvent(
- InsertNode node, TsFileResource resource) {
+ WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource resource) {
return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent(
- new PipeTabletInsertionEvent(node), node, resource);
+ new PipeTabletInsertionEvent(walEntryHandler), insertNode, resource);
}
private PipeRealtimeCollectEventFactory() {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
index 61b4e61a04e..43bddd872f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
@@ -19,18 +19,27 @@
package org.apache.iotdb.db.pipe.resource;
+import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager;
+import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
+
public class PipeResourceManager {
private final PipeFileResourceManager pipeFileResourceManager;
+ private final PipeWALResourceManager pipeWALResourceManager;
public static PipeFileResourceManager file() {
return PipeResourceManagerHolder.INSTANCE.pipeFileResourceManager;
}
+ public static PipeWALResourceManager wal() {
+ return PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager;
+ }
+
///////////////////////////// SINGLETON /////////////////////////////
private PipeResourceManager() {
pipeFileResourceManager = new PipeFileResourceManager();
+ pipeWALResourceManager = new PipeWALResourceManager();
}
private static class PipeResourceManagerHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
index e7d961b3c9f..942ab600536 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.resource;
+package org.apache.iotdb.db.pipe.resource.file;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.utils.FileUtils;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
new file mode 100644
index 00000000000..9ccb946ae45
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.wal;
+
+import org.apache.iotdb.db.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PipeWALResource implements AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeWALResource.class);
+
+ private final WALEntryHandler walEntryHandler;
+
+ private final AtomicInteger referenceCount;
+
+ // TODO: make this configurable
+ public static final long MIN_TIME_TO_LIVE_IN_MS = 1000 * 60;
+ private final AtomicLong lastLogicalPinTime;
+ private final AtomicBoolean isPhysicallyPinned;
+
+ public PipeWALResource(WALEntryHandler walEntryHandler) {
+ this.walEntryHandler = walEntryHandler;
+
+ referenceCount = new AtomicInteger(0);
+
+ lastLogicalPinTime = new AtomicLong(0);
+ isPhysicallyPinned = new AtomicBoolean(false);
+ }
+
+ public void pin() throws PipeRuntimeNonCriticalException {
+ if (referenceCount.get() == 0) {
+ if (!isPhysicallyPinned.get()) {
+ try {
+ walEntryHandler.pinMemTable();
+ } catch (MemTablePinException e) {
+ throw new PipeRuntimeNonCriticalException(
+ String.format(
+ "failed to pin wal %d, because %s",
+ walEntryHandler.getMemTableId(), e.getMessage()));
+ }
+ isPhysicallyPinned.set(true);
+ LOGGER.info("wal {} is pinned by pipe engine", walEntryHandler.getMemTableId());
+ } // else means the wal is already pinned, do nothing
+
+ // no matter the wal is pinned or not, update the last pin time
+ lastLogicalPinTime.set(System.currentTimeMillis());
+ }
+
+ referenceCount.incrementAndGet();
+ }
+
+ public void unpin() throws PipeRuntimeNonCriticalException {
+ final int finalReferenceCount = referenceCount.get();
+
+ if (finalReferenceCount == 1) {
+ unpinPhysicallyIfOutOfTimeToLive();
+ } else if (finalReferenceCount < 1) {
+ throw new PipeRuntimeCriticalException(
+ String.format(
+ "wal %d is unpinned more than pinned, this should not happen",
+ walEntryHandler.getMemTableId()));
+ }
+
+ referenceCount.decrementAndGet();
+ }
+
+ /**
+ * Invalidate the wal if it is unpinned and out of time to live.
+ *
+ * @return true if the wal is invalidated, false otherwise
+ */
+ public boolean invalidateIfPossible() {
+ if (referenceCount.get() > 0) {
+ return false;
+ }
+
+ // referenceCount.get() == 0
+ return unpinPhysicallyIfOutOfTimeToLive();
+ }
+
+ /**
+ * Unpin the wal if it is out of time to live.
+ *
+ * @return true if the wal is unpinned physically (then it can be invalidated), false otherwise
+ */
+ private boolean unpinPhysicallyIfOutOfTimeToLive() {
+ if (isPhysicallyPinned.get()) {
+ if (System.currentTimeMillis() - lastLogicalPinTime.get() > MIN_TIME_TO_LIVE_IN_MS) {
+ try {
+ walEntryHandler.unpinMemTable();
+ } catch (MemTablePinException e) {
+ throw new PipeRuntimeNonCriticalException(
+ String.format(
+ "failed to unpin wal %d, because %s",
+ walEntryHandler.getMemTableId(), e.getMessage()));
+ }
+ isPhysicallyPinned.set(false);
+ LOGGER.info(
+ "wal {} is unpinned by pipe engine when checking time to live",
+ walEntryHandler.getMemTableId());
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ LOGGER.info(
+ "wal {} is not pinned physically when checking time to live",
+ walEntryHandler.getMemTableId());
+ return true;
+ }
+ }
+
+ @Override
+ public void close() {
+ if (isPhysicallyPinned.get()) {
+ try {
+ walEntryHandler.unpinMemTable();
+ } catch (MemTablePinException e) {
+ LOGGER.error(
+ "failed to unpin wal {} when closing pipe wal resource, because {}",
+ walEntryHandler.getMemTableId(),
+ e.getMessage());
+ }
+ isPhysicallyPinned.set(false);
+ LOGGER.info(
+ "wal {} is unpinned by pipe engine when closing pipe wal resource",
+ walEntryHandler.getMemTableId());
+ }
+
+ referenceCount.set(0);
+ }
+
+ public int getReferenceCount() {
+ return referenceCount.get();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
new file mode 100644
index 00000000000..18284de32f4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -0,0 +1,111 @@
+/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless r [...]
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class PipeWALResourceManager implements AutoCloseable {
+
+ private final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
+
+ private static final int SEGMENT_LOCK_COUNT = 32;
+ private final ReentrantLock[] memtableIdSegmentLocks;
+
+ private static final ScheduledExecutorService PIPE_WAL_RESOURCE_TTL_CHECKER =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE.getName());
+ private final ScheduledFuture<?> ttlCheckerFuture;
+
+ public PipeWALResourceManager() {
+ memtableIdToPipeWALResourceMap = new HashMap<>();
+
+ memtableIdSegmentLocks = new ReentrantLock[SEGMENT_LOCK_COUNT];
+ for (int i = 0; i < SEGMENT_LOCK_COUNT; i++) {
+ memtableIdSegmentLocks[i] = new ReentrantLock();
+ }
+
+ ttlCheckerFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ PIPE_WAL_RESOURCE_TTL_CHECKER,
+ () -> {
+ for (final long memtableId : memtableIdToPipeWALResourceMap.keySet()) {
+ final ReentrantLock lock =
+ memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+ lock.lock();
+ try {
+ if (memtableIdToPipeWALResourceMap.get(memtableId).invalidateIfPossible()) {
+ memtableIdToPipeWALResourceMap.remove(memtableId);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ },
+ PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
+ PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void pin(long memtableId, WALEntryHandler walEntryHandler) {
+ final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+ lock.lock();
+ try {
+ memtableIdToPipeWALResourceMap
+ .computeIfAbsent(memtableId, id -> new PipeWALResource(walEntryHandler))
+ .pin();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void unpin(long memtableId) {
+ final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+ lock.lock();
+ try {
+ memtableIdToPipeWALResourceMap.get(memtableId).unpin();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (ttlCheckerFuture != null) {
+ ttlCheckerFuture.cancel(true);
+ }
+
+ for (final long memtableId : memtableIdToPipeWALResourceMap.keySet()) {
+ final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+ lock.lock();
+ try {
+ memtableIdToPipeWALResourceMap.get(memtableId).close();
+ memtableIdToPipeWALResourceMap.remove(memtableId);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ public int getReferenceCount(long memtableId) {
+ final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+ lock.lock();
+ try {
+ return memtableIdToPipeWALResourceMap.get(memtableId).getReferenceCount();
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 742b2230fa6..df0ea7d1fcd 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -39,6 +39,9 @@ public class PipeConnectorSubtask extends PipeSubtask {
private final ListenableBlockingPendingQueue<Event> inputPendingQueue;
private final PipeConnector outputPipeConnector;
+ private static final int HEARTBEAT_CHECK_INTERVAL = 1000;
+ private int executeOnceInvokedTimes;
+
/** @param taskID connectorAttributeSortedString */
public PipeConnectorSubtask(
String taskID,
@@ -47,13 +50,15 @@ public class PipeConnectorSubtask extends PipeSubtask {
super(taskID);
this.inputPendingQueue = inputPendingQueue;
this.outputPipeConnector = outputPipeConnector;
+ executeOnceInvokedTimes = 0;
}
@Override
protected synchronized boolean executeOnce() {
try {
- // TODO: reduce the frequency of heartbeat
- outputPipeConnector.heartbeat();
+ if (executeOnceInvokedTimes++ % HEARTBEAT_CHECK_INTERVAL == 0) {
+ outputPipeConnector.heartbeat();
+ }
} catch (Exception e) {
throw new PipeConnectionException(
"PipeConnector: failed to connect to the target system.", e);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index bd4a5dd5f13..9aff438a835 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -256,7 +256,7 @@ public class WALBuffer extends AbstractWALBuffer {
// update related info
totalSize += size;
info.metaData.add(size, searchIndex);
- walEntry.getWalFlushListener().getWalPipeHandler().setSize(size);
+ walEntry.getWalFlushListener().getWalEntryHandler().setSize(size);
info.fsyncListeners.add(walEntry.getWalFlushListener());
}
@@ -495,9 +495,9 @@ public class WALBuffer extends AbstractWALBuffer {
if (forceSuccess) {
for (WALFlushListener fsyncListener : info.fsyncListeners) {
fsyncListener.succeed();
- if (fsyncListener.getWalPipeHandler() != null) {
- fsyncListener.getWalPipeHandler().setEntryPosition(walFileVersionId, position);
- position += fsyncListener.getWalPipeHandler().getSize();
+ if (fsyncListener.getWalEntryHandler() != null) {
+ fsyncListener.getWalEntryHandler().setEntryPosition(walFileVersionId, position);
+ position += fsyncListener.getWalEntryHandler().getSize();
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 4153c3c49f5..3eacc838cec 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -150,8 +150,8 @@ public class WALNode implements IWALNode {
private WALFlushListener log(WALEntry walEntry) {
buffer.write(walEntry);
// set handler for pipe
- walEntry.getWalFlushListener().getWalPipeHandler().setMemTableId(walEntry.getMemTableId());
- walEntry.getWalFlushListener().getWalPipeHandler().setWalNode(this);
+ walEntry.getWalFlushListener().getWalEntryHandler().setMemTableId(walEntry.getMemTableId());
+ walEntry.getWalFlushListener().getWalEntryHandler().setWalNode(this);
return walEntry.getWalFlushListener();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryHandler.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
rename to server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryHandler.java
index abdb4771a93..e8849dc6536 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryHandler.java
@@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory;
* This handler is used by the Pipe to find the corresponding insert node. Besides, it can try to
* pin/unpin the wal entries by the memTable id.
*/
-public class WALPipeHandler {
- private static final Logger logger = LoggerFactory.getLogger(WALPipeHandler.class);
+public class WALEntryHandler {
+ private static final Logger logger = LoggerFactory.getLogger(WALEntryHandler.class);
private long memTableId = -1;
/** cached value, null after this value is flushed to wal successfully */
@@ -42,7 +42,7 @@ public class WALPipeHandler {
/** wal node, null when wal is disabled */
private WALNode walNode = null;
- public WALPipeHandler(WALEntryValue value) {
+ public WALEntryHandler(WALEntryValue value) {
this.value = value;
}
@@ -101,6 +101,10 @@ public class WALPipeHandler {
}
}
+ public long getMemTableId() {
+ return memTableId;
+ }
+
public void setMemTableId(long memTableId) {
this.memTableId = memTableId;
}
@@ -129,4 +133,16 @@ public class WALPipeHandler {
public void setSize(int size) {
this.walEntryPosition.setSize(size);
}
+
+ @Override
+ public String toString() {
+ return "WALEntryHandler{"
+ + "memTableId="
+ + memTableId
+ + ", value="
+ + value
+ + ", walEntryPosition="
+ + walEntryPosition
+ + '}';
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
index 1b468635899..2fecd1f585c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
@@ -19,19 +19,19 @@
package org.apache.iotdb.db.wal.utils.listener;
import org.apache.iotdb.db.wal.buffer.WALEntryValue;
-import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
/** This class helps judge whether wal is flushed to the storage device. */
public class WALFlushListener extends AbstractResultListener {
// handler for pipeline, only exists then value is InsertNode
- private final WALPipeHandler walPipeHandler;
+ private final WALEntryHandler walEntryHandler;
public WALFlushListener(boolean wait, WALEntryValue value) {
super(wait);
- walPipeHandler = new WALPipeHandler(value);
+ walEntryHandler = new WALEntryHandler(value);
}
- public WALPipeHandler getWalPipeHandler() {
- return walPipeHandler;
+ public WALEntryHandler getWalEntryHandler() {
+ return walEntryHandler;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 7cd705af588..45b0cf504b2 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCo
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -57,6 +58,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import static org.mockito.Mockito.mock;
+
public class PipeRealtimeCollectTest {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeCollectTest.class);
@@ -229,6 +232,7 @@ public class PipeRealtimeCollectTest {
}
private Future<?> write2DataRegion(int writeNum, String dataRegionId, int startNum) {
+
File dataRegionDir =
new File(tsFileDir.getPath() + File.separator + dataRegionId + File.separator + "0");
boolean ignored = dataRegionDir.mkdirs();
@@ -249,6 +253,7 @@ public class PipeRealtimeCollectTest {
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionId,
+ mock(WALEntryHandler.class),
new InsertRowNode(
new PlanNodeId(String.valueOf(i)),
new PartialPath(device),
@@ -262,6 +267,7 @@ public class PipeRealtimeCollectTest {
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionId,
+ mock(WALEntryHandler.class),
new InsertRowNode(
new PlanNodeId(String.valueOf(i)),
new PartialPath(device),
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
index b2441aa9d9f..ef86b0db285 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.TsFileWriter;
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/WALEntryHandlerTest.java
similarity index 94%
rename from server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java
rename to server/src/test/java/org/apache/iotdb/db/wal/node/WALEntryHandlerTest.java
index 77c28c6d333..d34ca54cae8 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/WALEntryHandlerTest.java
@@ -33,8 +33,8 @@ import org.apache.iotdb.db.wal.buffer.WALEntry;
import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
import org.apache.iotdb.db.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import org.apache.iotdb.db.wal.utils.WALMode;
-import org.apache.iotdb.db.wal.utils.WALPipeHandler;
import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -49,7 +49,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-public class WALPipeHandlerTest {
+public class WALEntryHandlerTest {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final String identifier = String.valueOf(Integer.MAX_VALUE);
private static final String logDirectory = TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
@@ -85,7 +85,7 @@ public class WALPipeHandlerTest {
memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
walNode.onMemTableFlushed(memTable);
// pin flushed memTable
- WALPipeHandler handler = flushListener.getWalPipeHandler();
+ WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
}
@@ -97,7 +97,7 @@ public class WALPipeHandlerTest {
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
// pin memTable
- WALPipeHandler handler = flushListener.getWalPipeHandler();
+ WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
walNode.onMemTableFlushed(memTable);
// roll wal file
@@ -128,7 +128,7 @@ public class WALPipeHandlerTest {
memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
walNode.onMemTableFlushed(memTable);
// pin flushed memTable
- WALPipeHandler handler = flushListener.getWalPipeHandler();
+ WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.unpinMemTable();
}
@@ -139,7 +139,7 @@ public class WALPipeHandlerTest {
WALFlushListener flushListener =
walNode.log(
memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
- WALPipeHandler handler = flushListener.getWalPipeHandler();
+ WALEntryHandler handler = flushListener.getWalEntryHandler();
// pin twice
handler.pinMemTable();
handler.pinMemTable();
@@ -164,7 +164,7 @@ public class WALPipeHandlerTest {
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
// pin memTable
- WALPipeHandler handler = flushListener.getWalPipeHandler();
+ WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
walNode.onMemTableFlushed(memTable);
// roll wal file
@@ -193,7 +193,7 @@ public class WALPipeHandlerTest {
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
// pin memTable
- WALPipeHandler handler = flushListener.getWalPipeHandler();
+ WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
walNode.onMemTableFlushed(memTable);
assertEquals(node1, handler.getValue());
@@ -207,7 +207,7 @@ public class WALPipeHandlerTest {
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
// pin memTable
- WALPipeHandler handler = flushListener.getWalPipeHandler();
+ WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
walNode.onMemTableFlushed(memTable);
// wait until wal flushed
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java
index 98459619a70..1e5dc245cc9 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java
@@ -79,7 +79,7 @@ public class WALInsertNodeCacheTest {
InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
- WALEntryPosition position = flushListener.getWalPipeHandler().getWalEntryPosition();
+ WALEntryPosition position = flushListener.getWalEntryHandler().getWalEntryPosition();
// wait until wal flushed
while (!walNode.isAllWALEntriesConsumed() || !position.canRead()) {
Thread.sleep(50);
@@ -96,18 +96,18 @@ public class WALInsertNodeCacheTest {
InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
node1.setSearchIndex(1);
WALFlushListener flushListener1 = walNode.log(memTable1.getMemTableId(), node1);
- WALEntryPosition position1 = flushListener1.getWalPipeHandler().getWalEntryPosition();
+ WALEntryPosition position1 = flushListener1.getWalEntryHandler().getWalEntryPosition();
InsertRowNode node2 = getInsertRowNode(devicePath, System.currentTimeMillis());
node1.setSearchIndex(2);
WALFlushListener flushListener2 = walNode.log(memTable1.getMemTableId(), node2);
- WALEntryPosition position2 = flushListener2.getWalPipeHandler().getWalEntryPosition();
+ WALEntryPosition position2 = flushListener2.getWalEntryHandler().getWalEntryPosition();
// write memTable2
IMemTable memTable2 = new PrimitiveMemTable();
walNode.onMemTableCreated(memTable2, logDirectory + "/" + "fake2.tsfile");
InsertRowNode node3 = getInsertRowNode(devicePath, System.currentTimeMillis());
node1.setSearchIndex(3);
WALFlushListener flushListener3 = walNode.log(memTable2.getMemTableId(), node3);
- WALEntryPosition position3 = flushListener3.getWalPipeHandler().getWalEntryPosition();
+ WALEntryPosition position3 = flushListener3.getWalEntryHandler().getWalEntryPosition();
// wait until wal flushed
walNode.rollWALFile();
while (!walNode.isAllWALEntriesConsumed() || !position3.canRead()) {