You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/13 04:38:32 UTC

[iotdb] 01/01: Pipe: use PipeTaskCoordinatorLock instead of ReentrantLock for multi thread sync (#11129)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch 1.2-fix-pipe-lock
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 45b934d2570a2a200c90a881699f6afae1e62ff7
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Sep 13 12:37:16 2023 +0800

    Pipe: use PipeTaskCoordinatorLock instead of ReentrantLock for multi thread sync (#11129)
    
    (cherry picked from commit c23013463d2508eac0cf8ca0c069afd7a3fe213d)
---
 .../manager/pipe/task/PipeTaskCoordinator.java     |  7 +--
 .../manager/pipe/task/PipeTaskCoordinatorLock.java | 73 ++++++++++++++++++++++
 2 files changed, 75 insertions(+), 5 deletions(-)

diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
index 832f6a1a84a..5bec78d66b9 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
 
 public class PipeTaskCoordinator {
 
@@ -49,13 +48,13 @@ public class PipeTaskCoordinator {
   // NEVER EXPOSE THIS DIRECTLY TO THE OUTSIDE
   private final PipeTaskInfo pipeTaskInfo;
 
-  private final ReentrantLock pipeTaskCoordinatorLock;
+  private final PipeTaskCoordinatorLock pipeTaskCoordinatorLock;
   private AtomicReference<PipeTaskInfo> pipeTaskInfoHolder;
 
   public PipeTaskCoordinator(ConfigManager configManager, PipeTaskInfo pipeTaskInfo) {
     this.configManager = configManager;
     this.pipeTaskInfo = pipeTaskInfo;
-    this.pipeTaskCoordinatorLock = new ReentrantLock(true);
+    this.pipeTaskCoordinatorLock = new PipeTaskCoordinatorLock();
   }
 
   /**
@@ -66,7 +65,6 @@ public class PipeTaskCoordinator {
    */
   public AtomicReference<PipeTaskInfo> lock() {
     pipeTaskCoordinatorLock.lock();
-    LOGGER.info("Pipe task coordinator locked.");
 
     pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
     return pipeTaskInfoHolder;
@@ -86,7 +84,6 @@ public class PipeTaskCoordinator {
 
     try {
       pipeTaskCoordinatorLock.unlock();
-      LOGGER.info("Pipe task coordinator unlocked.");
       return true;
     } catch (IllegalMonitorStateException ignored) {
       // This is thrown if unlock() is called without lock() called first.
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinatorLock.java
new file mode 100644
index 00000000000..78c1f28ba21
--- /dev/null
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinatorLock.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.task;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * PipeTaskCoordinatorLock is a cross thread lock for pipe task coordinator. It is used to ensure
+ * that only one thread can execute the pipe task coordinator at the same time.
+ */
+public class PipeTaskCoordinatorLock {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskCoordinatorLock.class);
+
+  private final BlockingDeque<Long> deque = new LinkedBlockingDeque<>(1);
+  private final AtomicLong idGenerator = new AtomicLong(0);
+
+  void lock() {
+    try {
+      final long id = idGenerator.incrementAndGet();
+      LOGGER.info(
+          "PipeTaskCoordinator lock (id: {}) waiting for thread {}",
+          id,
+          Thread.currentThread().getName());
+      deque.put(id);
+      LOGGER.info(
+          "PipeTaskCoordinator lock (id: {}) acquired by thread {}",
+          id,
+          Thread.currentThread().getName());
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.error(
+          "Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}",
+          Thread.currentThread().getName());
+    }
+  }
+
+  void unlock() {
+    final Long id = deque.poll();
+    if (id == null) {
+      LOGGER.error(
+          "PipeTaskCoordinator lock released by thread {} but the lock is not acquired by any thread",
+          Thread.currentThread().getName());
+    } else {
+      LOGGER.info(
+          "PipeTaskCoordinator lock (id: {}) released by thread {}",
+          id,
+          Thread.currentThread().getName());
+    }
+  }
+}