You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/25 12:18:52 UTC
[iotdb] 04/05: bind WALPipeHandle
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch pipe-wal-resource-management
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 77ce31eb61df69bd4c5dadb260df4c6aca16aaea
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu May 25 19:57:07 2023 +0800
bind WALPipeHandle
---
.../db/engine/storagegroup/TsFileProcessor.java | 10 +++-
.../listener/PipeInsertionDataNodeListener.java | 10 ++--
.../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 3 +-
.../core/event/impl/PipeTabletInsertionEvent.java | 53 +++++++++++++++-------
.../realtime/PipeRealtimeCollectEventFactory.java | 5 +-
.../db/pipe/resource/wal/PipeWALResource.java | 4 ++
.../pipe/resource/wal/PipeWALResourceManager.java | 11 +++++
.../apache/iotdb/db/wal/utils/WALPipeHandler.java | 14 ++++++
.../core/collector/PipeRealtimeCollectTest.java | 6 +++
9 files changed, 91 insertions(+), 25 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index a0a36e3a1c0..6eceea951d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -277,7 +277,10 @@ public class TsFileProcessor {
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
- dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, tsFileResource);
+ dataRegionInfo.getDataRegion().getDataRegionId(),
+ walFlushListener.getWalPipeHandler(),
+ insertRowNode,
+ tsFileResource);
if (insertRowNode.isAligned()) {
workMemTable.insertAlignedRow(insertRowNode);
@@ -377,7 +380,10 @@ public class TsFileProcessor {
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
- dataRegionInfo.getDataRegion().getDataRegionId(), insertTabletNode, tsFileResource);
+ dataRegionInfo.getDataRegion().getDataRegionId(),
+ walFlushListener.getWalPipeHandler(),
+ insertTabletNode,
+ tsFileResource);
try {
if (insertTabletNode.isAligned()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
index 778a583f1c7..f2298310544 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.assigner.PipeDataRegionAssigner;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEventFactory;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -84,9 +85,11 @@ public class PipeInsertionDataNodeListener {
assigner.publishToAssign(PipeRealtimeCollectEventFactory.createCollectEvent(tsFileResource));
}
- // TODO: check whether the method is called on the right place.
public void listenToInsertNode(
- String dataRegionId, InsertNode insertNode, TsFileResource tsFileResource) {
+ String dataRegionId,
+ WALPipeHandler walPipeHandler,
+ InsertNode insertNode,
+ TsFileResource tsFileResource) {
final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
// only events from registered data region will be collected
@@ -95,7 +98,8 @@ public class PipeInsertionDataNodeListener {
}
assigner.publishToAssign(
- PipeRealtimeCollectEventFactory.createCollectEvent(insertNode, tsFileResource));
+ PipeRealtimeCollectEventFactory.createCollectEvent(
+ walPipeHandler, insertNode, tsFileResource));
}
/////////////////////////////// singleton ///////////////////////////////
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 6b414c03b36..65837f31d42 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -130,7 +131,7 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
}
private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent)
- throws PipeException, TException {
+ throws PipeException, TException, WALPipeException {
final TPipeTransferResp resp =
client.pipeTransfer(
PipeTransferInsertNodeReq.toTPipeTransferReq(pipeTabletInsertionEvent.getInsertNode()));
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index 62c599b66c1..476f9e6f12a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -21,28 +21,32 @@ package org.apache.iotdb.db.pipe.core.event.impl;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.wal.exception.WALPipeException;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedEvent {
- private final InsertNode insertNode;
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletInsertionEvent.class);
- private final AtomicInteger referenceCount;
+ private final WALPipeHandler walPipeHandler;
- public PipeTabletInsertionEvent(InsertNode insertNode) {
- this.insertNode = insertNode;
- this.referenceCount = new AtomicInteger(0);
+ public PipeTabletInsertionEvent(WALPipeHandler walPipeHandler) {
+ this.walPipeHandler = walPipeHandler;
}
- public InsertNode getInsertNode() {
- return insertNode;
+ public InsertNode getInsertNode() throws WALPipeException {
+ return walPipeHandler.getValue();
}
@Override
@@ -62,26 +66,41 @@ public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedE
@Override
public boolean increaseReferenceCount(String holderMessage) {
- // TODO: use WALPipeHandler pinMemtable
- referenceCount.incrementAndGet();
- return true;
+ try {
+ PipeResourceManager.wal().pin(walPipeHandler.getMemTableId(), walPipeHandler);
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn(
+ String.format(
+ "Increase reference count for memtable %d error. Holder Message: %s",
+ walPipeHandler.getMemTableId(), holderMessage),
+ e);
+ return false;
+ }
}
@Override
public boolean decreaseReferenceCount(String holderMessage) {
- // TODO: use WALPipeHandler unpinMemetable
- referenceCount.decrementAndGet();
- return true;
+ try {
+ PipeResourceManager.wal().unpin(walPipeHandler.getMemTableId());
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn(
+ String.format(
+ "Decrease reference count for memtable %d error. Holder Message: %s",
+ walPipeHandler.getMemTableId(), holderMessage),
+ e);
+ return false;
+ }
}
@Override
public int getReferenceCount() {
- // TODO: use WALPipeHandler unpinMemetable
- return referenceCount.get();
+ return PipeResourceManager.wal().getReferenceCount(walPipeHandler.getMemTableId());
}
@Override
public String toString() {
- return "PipeTabletInsertionEvent{" + "insertNode=" + insertNode + '}';
+ return "PipeTabletInsertionEvent{" + "walPipeHandler=" + walPipeHandler + '}';
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index 4c98c5193be..a0961624dbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
public class PipeRealtimeCollectEventFactory {
@@ -34,9 +35,9 @@ public class PipeRealtimeCollectEventFactory {
}
public static PipeRealtimeCollectEvent createCollectEvent(
- InsertNode node, TsFileResource resource) {
+ WALPipeHandler walPipeHandler, InsertNode insertNode, TsFileResource resource) {
return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent(
- new PipeTabletInsertionEvent(node), node, resource);
+ new PipeTabletInsertionEvent(walPipeHandler), insertNode, resource);
}
private PipeRealtimeCollectEventFactory() {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index 844420272bd..8d594629bf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -155,4 +155,8 @@ public class PipeWALResource implements AutoCloseable {
referenceCount.set(0);
}
+
+ public int getReferenceCount() {
+ return referenceCount.get();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index c187a29f781..18b942496cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -95,4 +95,15 @@ public class PipeWALResourceManager implements AutoCloseable {
}
}
}
+
+ public int getReferenceCount(long memtableId) {
+ final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+ lock.lock();
+ try {
+ return memtableIdToPipeWALResourceMap.get(memtableId).getReferenceCount();
+ } finally {
+ lock.unlock();
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
index 80333cf8ffb..3392731f148 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
@@ -133,4 +133,18 @@ public class WALPipeHandler {
public void setSize(int size) {
this.walEntryPosition.setSize(size);
}
+
+ @Override
+ public String toString() {
+ return "WALPipeHandler{"
+ + "memTableId="
+ + memTableId
+ + ", value="
+ + value
+ + ", walEntryPosition="
+ + walEntryPosition
+ + ", walNode="
+ + walNode
+ + '}';
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 7cd705af588..d18fc026678 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCo
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -57,6 +58,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import static org.mockito.Mockito.mock;
+
public class PipeRealtimeCollectTest {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeCollectTest.class);
@@ -229,6 +232,7 @@ public class PipeRealtimeCollectTest {
}
private Future<?> write2DataRegion(int writeNum, String dataRegionId, int startNum) {
+
File dataRegionDir =
new File(tsFileDir.getPath() + File.separator + dataRegionId + File.separator + "0");
boolean ignored = dataRegionDir.mkdirs();
@@ -249,6 +253,7 @@ public class PipeRealtimeCollectTest {
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionId,
+ mock(WALPipeHandler.class),
new InsertRowNode(
new PlanNodeId(String.valueOf(i)),
new PartialPath(device),
@@ -262,6 +267,7 @@ public class PipeRealtimeCollectTest {
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionId,
+ mock(WALPipeHandler.class),
new InsertRowNode(
new PlanNodeId(String.valueOf(i)),
new PartialPath(device),