You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/25 12:18:52 UTC

[iotdb] 04/05: bind WALPipeHandle

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-wal-resource-management
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 77ce31eb61df69bd4c5dadb260df4c6aca16aaea
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu May 25 19:57:07 2023 +0800

    bind WALPipeHandle
---
 .../db/engine/storagegroup/TsFileProcessor.java    | 10 +++-
 .../listener/PipeInsertionDataNodeListener.java    | 10 ++--
 .../impl/iotdb/v1/IoTDBThriftConnectorV1.java      |  3 +-
 .../core/event/impl/PipeTabletInsertionEvent.java  | 53 +++++++++++++++-------
 .../realtime/PipeRealtimeCollectEventFactory.java  |  5 +-
 .../db/pipe/resource/wal/PipeWALResource.java      |  4 ++
 .../pipe/resource/wal/PipeWALResourceManager.java  | 11 +++++
 .../apache/iotdb/db/wal/utils/WALPipeHandler.java  | 14 ++++++
 .../core/collector/PipeRealtimeCollectTest.java    |  6 +++
 9 files changed, 91 insertions(+), 25 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index a0a36e3a1c0..6eceea951d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -277,7 +277,10 @@ public class TsFileProcessor {
 
     PipeInsertionDataNodeListener.getInstance()
         .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, tsFileResource);
+            dataRegionInfo.getDataRegion().getDataRegionId(),
+            walFlushListener.getWalPipeHandler(),
+            insertRowNode,
+            tsFileResource);
 
     if (insertRowNode.isAligned()) {
       workMemTable.insertAlignedRow(insertRowNode);
@@ -377,7 +380,10 @@ public class TsFileProcessor {
 
     PipeInsertionDataNodeListener.getInstance()
         .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionId(), insertTabletNode, tsFileResource);
+            dataRegionInfo.getDataRegion().getDataRegionId(),
+            walFlushListener.getWalPipeHandler(),
+            insertTabletNode,
+            tsFileResource);
 
     try {
       if (insertTabletNode.isAligned()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
index 778a583f1c7..f2298310544 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.assigner.PipeDataRegionAssigner;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEventFactory;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -84,9 +85,11 @@ public class PipeInsertionDataNodeListener {
     assigner.publishToAssign(PipeRealtimeCollectEventFactory.createCollectEvent(tsFileResource));
   }
 
-  // TODO: check whether the method is called on the right place.
   public void listenToInsertNode(
-      String dataRegionId, InsertNode insertNode, TsFileResource tsFileResource) {
+      String dataRegionId,
+      WALPipeHandler walPipeHandler,
+      InsertNode insertNode,
+      TsFileResource tsFileResource) {
     final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
 
     // only events from registered data region will be collected
@@ -95,7 +98,8 @@ public class PipeInsertionDataNodeListener {
     }
 
     assigner.publishToAssign(
-        PipeRealtimeCollectEventFactory.createCollectEvent(insertNode, tsFileResource));
+        PipeRealtimeCollectEventFactory.createCollectEvent(
+            walPipeHandler, insertNode, tsFileResource));
   }
 
   /////////////////////////////// singleton ///////////////////////////////
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 6b414c03b36..65837f31d42 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
 import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -130,7 +131,7 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
   }
 
   private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent)
-      throws PipeException, TException {
+      throws PipeException, TException, WALPipeException {
     final TPipeTransferResp resp =
         client.pipeTransfer(
             PipeTransferInsertNodeReq.toTPipeTransferReq(pipeTabletInsertionEvent.getInsertNode()));
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index 62c599b66c1..476f9e6f12a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -21,28 +21,32 @@ package org.apache.iotdb.db.pipe.core.event.impl;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.wal.exception.WALPipeException;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 
 public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedEvent {
 
-  private final InsertNode insertNode;
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletInsertionEvent.class);
 
-  private final AtomicInteger referenceCount;
+  private final WALPipeHandler walPipeHandler;
 
-  public PipeTabletInsertionEvent(InsertNode insertNode) {
-    this.insertNode = insertNode;
-    this.referenceCount = new AtomicInteger(0);
+  public PipeTabletInsertionEvent(WALPipeHandler walPipeHandler) {
+    this.walPipeHandler = walPipeHandler;
   }
 
-  public InsertNode getInsertNode() {
-    return insertNode;
+  public InsertNode getInsertNode() throws WALPipeException {
+    return walPipeHandler.getValue();
   }
 
   @Override
@@ -62,26 +66,41 @@ public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedE
 
   @Override
   public boolean increaseReferenceCount(String holderMessage) {
-    // TODO: use WALPipeHandler pinMemtable
-    referenceCount.incrementAndGet();
-    return true;
+    try {
+      PipeResourceManager.wal().pin(walPipeHandler.getMemTableId(), walPipeHandler);
+      return true;
+    } catch (Exception e) {
+      LOGGER.warn(
+          String.format(
+              "Increase reference count for memtable %d error. Holder Message: %s",
+              walPipeHandler.getMemTableId(), holderMessage),
+          e);
+      return false;
+    }
   }
 
   @Override
   public boolean decreaseReferenceCount(String holderMessage) {
-    // TODO: use WALPipeHandler unpinMemetable
-    referenceCount.decrementAndGet();
-    return true;
+    try {
+      PipeResourceManager.wal().unpin(walPipeHandler.getMemTableId());
+      return true;
+    } catch (Exception e) {
+      LOGGER.warn(
+          String.format(
+              "Decrease reference count for memtable %d error. Holder Message: %s",
+              walPipeHandler.getMemTableId(), holderMessage),
+          e);
+      return false;
+    }
   }
 
   @Override
   public int getReferenceCount() {
-    // TODO: use WALPipeHandler unpinMemetable
-    return referenceCount.get();
+    return PipeResourceManager.wal().getReferenceCount(walPipeHandler.getMemTableId());
   }
 
   @Override
   public String toString() {
-    return "PipeTabletInsertionEvent{" + "insertNode=" + insertNode + '}';
+    return "PipeTabletInsertionEvent{" + "walPipeHandler=" + walPipeHandler + '}';
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index 4c98c5193be..a0961624dbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
 
 public class PipeRealtimeCollectEventFactory {
 
@@ -34,9 +35,9 @@ public class PipeRealtimeCollectEventFactory {
   }
 
   public static PipeRealtimeCollectEvent createCollectEvent(
-      InsertNode node, TsFileResource resource) {
+      WALPipeHandler walPipeHandler, InsertNode insertNode, TsFileResource resource) {
     return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent(
-        new PipeTabletInsertionEvent(node), node, resource);
+        new PipeTabletInsertionEvent(walPipeHandler), insertNode, resource);
   }
 
   private PipeRealtimeCollectEventFactory() {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index 844420272bd..8d594629bf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -155,4 +155,8 @@ public class PipeWALResource implements AutoCloseable {
 
     referenceCount.set(0);
   }
+
+  public int getReferenceCount() {
+    return referenceCount.get();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index c187a29f781..18b942496cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -95,4 +95,15 @@ public class PipeWALResourceManager implements AutoCloseable {
       }
     }
   }
+
+  public int getReferenceCount(long memtableId) {
+    final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+    lock.lock();
+    try {
+      return memtableIdToPipeWALResourceMap.get(memtableId).getReferenceCount();
+    } finally {
+      lock.unlock();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
index 80333cf8ffb..3392731f148 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
@@ -133,4 +133,18 @@ public class WALPipeHandler {
   public void setSize(int size) {
     this.walEntryPosition.setSize(size);
   }
+
+  @Override
+  public String toString() {
+    return "WALPipeHandler{"
+        + "memTableId="
+        + memTableId
+        + ", value="
+        + value
+        + ", walEntryPosition="
+        + walEntryPosition
+        + ", walNode="
+        + walNode
+        + '}';
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 7cd705af588..d18fc026678 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCo
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -57,6 +58,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
+import static org.mockito.Mockito.mock;
+
 public class PipeRealtimeCollectTest {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeCollectTest.class);
@@ -229,6 +232,7 @@ public class PipeRealtimeCollectTest {
   }
 
   private Future<?> write2DataRegion(int writeNum, String dataRegionId, int startNum) {
+
     File dataRegionDir =
         new File(tsFileDir.getPath() + File.separator + dataRegionId + File.separator + "0");
     boolean ignored = dataRegionDir.mkdirs();
@@ -249,6 +253,7 @@ public class PipeRealtimeCollectTest {
             PipeInsertionDataNodeListener.getInstance()
                 .listenToInsertNode(
                     dataRegionId,
+                    mock(WALPipeHandler.class),
                     new InsertRowNode(
                         new PlanNodeId(String.valueOf(i)),
                         new PartialPath(device),
@@ -262,6 +267,7 @@ public class PipeRealtimeCollectTest {
             PipeInsertionDataNodeListener.getInstance()
                 .listenToInsertNode(
                     dataRegionId,
+                    mock(WALPipeHandler.class),
                     new InsertRowNode(
                         new PlanNodeId(String.valueOf(i)),
                         new PartialPath(device),