You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/18 02:09:58 UTC
[iotdb] 01/01: new ExclusiveWriteLogNode
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch new_wal
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 52763281484e21ddab9f71455def7907b28add2d
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Aug 18 10:09:04 2021 +0800
new ExclusiveWriteLogNode
---
.../db/writelog/node/ExclusiveWriteLogNode.java | 86 ++++++++--------------
1 file changed, 32 insertions(+), 54 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index c947963..56fafe2 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.writelog.node;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
@@ -28,7 +29,6 @@ import org.apache.iotdb.db.writelog.io.ILogWriter;
import org.apache.iotdb.db.writelog.io.LogWriter;
import org.apache.iotdb.db.writelog.io.MultiFileLogReader;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +42,7 @@ import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
/** This WriteLogNode is used to manage insert ahead logs of a TsFile. */
@@ -51,33 +51,31 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
public static final String WAL_FILE_NAME = "wal";
private static final Logger logger = LoggerFactory.getLogger(ExclusiveWriteLogNode.class);
- private String identifier;
+ private final String identifier;
- private String logDirectory;
+ private final String logDirectory;
private ILogWriter currentFileWriter;
- private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private ByteBuffer logBufferWorking;
- private ByteBuffer logBufferIdle;
- private ByteBuffer logBufferFlushing;
+ private volatile ByteBuffer logBufferWorking;
+ private volatile ByteBuffer logBufferIdle;
+ private volatile ByteBuffer logBufferFlushing;
// used for the convenience of deletion
- private ByteBuffer[] bufferArray;
+ private volatile ByteBuffer[] bufferArray;
private final Object switchBufferCondition = new Object();
- private ReentrantLock lock = new ReentrantLock();
- private static final ExecutorService FLUSH_BUFFER_THREAD_POOL =
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("Flush-WAL-Thread-%d").setDaemon(true).build());
+ private final ReentrantLock lock = new ReentrantLock();
+ private final ExecutorService FLUSH_BUFFER_THREAD_POOL;
private long fileId = 0;
private long lastFlushedId = 0;
private int bufferedLogNum = 0;
- private boolean deleted;
+ private final AtomicBoolean deleted = new AtomicBoolean(false);
/**
* constructor of ExclusiveWriteLogNode.
@@ -91,6 +89,9 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
if (SystemFileFactory.INSTANCE.getFile(logDirectory).mkdirs()) {
logger.info("create the WAL folder {}.", logDirectory);
}
+ // this.identifier contains the storage group name + tsfile name.
+ FLUSH_BUFFER_THREAD_POOL =
+ IoTDBThreadPoolFactory.newSingleThreadExecutor("Flush-WAL-Thread-" + this.identifier);
}
@Override
@@ -102,7 +103,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
@Override
public void write(PhysicalPlan plan) throws IOException {
- if (deleted) {
+ if (deleted.get()) {
throw new IOException("WAL node deleted");
}
lock.lock();
@@ -138,7 +139,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
lock.lock();
try {
synchronized (switchBufferCondition) {
- while (logBufferFlushing != null && !deleted) {
+ while (logBufferFlushing != null && !deleted.get()) {
switchBufferCondition.wait();
}
switchBufferCondition.notifyAll();
@@ -151,7 +152,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
}
logger.debug("Log node {} closed successfully", identifier);
} catch (IOException e) {
- logger.error("Cannot close log node {} because:", identifier, e);
+ logger.warn("Cannot close log node {} because:", identifier, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Waiting for current buffer being flushed interrupted");
@@ -162,7 +163,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
@Override
public void forceSync() {
- if (deleted) {
+ if (deleted.get()) {
return;
}
sync();
@@ -208,9 +209,10 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
try {
close();
FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(logDirectory));
- deleted = true;
+ deleted.set(true);
return this.bufferArray;
} finally {
+ FLUSH_BUFFER_THREAD_POOL.shutdown();
lock.unlock();
}
}
@@ -232,7 +234,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
FileUtils.forceDelete(logFile);
logger.info("Log node {} cleaned old file", identifier);
} catch (IOException e) {
- logger.error("Old log file {} of {} cannot be deleted", logFile.getName(), identifier, e);
+ logger.warn("Old log file {} of {} cannot be deleted", logFile.getName(), identifier, e);
}
}
}
@@ -245,7 +247,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
currentFileWriter.force();
}
} catch (IOException e) {
- logger.error("Log node {} force failed.", identifier, e);
+ logger.warn("Log node {} force failed.", identifier, e);
}
} finally {
lock.unlock();
@@ -261,8 +263,6 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
switchBufferWorkingToFlushing();
ILogWriter currWriter = getCurrentFileWriter();
FLUSH_BUFFER_THREAD_POOL.submit(() -> flushBuffer(currWriter));
- switchBufferIdleToWorking();
-
bufferedLogNum = 0;
logger.debug("Log node {} ends sync.", identifier);
} catch (InterruptedException e) {
@@ -281,50 +281,28 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
} catch (ClosedChannelException e) {
// ignore
} catch (IOException e) {
- logger.error("Log node {} sync failed, change system mode to read-only", identifier, e);
+ logger.warn("Log node {} sync failed, change system mode to read-only", identifier, e);
IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
return;
}
- logBufferFlushing.clear();
-
- try {
- switchBufferFlushingToIdle();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- private void switchBufferWorkingToFlushing() throws InterruptedException {
+ // switch buffer flushing to idle and notify the sync thread
synchronized (switchBufferCondition) {
- while (logBufferFlushing != null && !deleted) {
- switchBufferCondition.wait();
- }
- logBufferFlushing = logBufferWorking;
- logBufferWorking = null;
+ logBufferIdle = logBufferFlushing;
+ logBufferFlushing = null;
switchBufferCondition.notifyAll();
}
}
- private void switchBufferIdleToWorking() throws InterruptedException {
+ private void switchBufferWorkingToFlushing() throws InterruptedException {
synchronized (switchBufferCondition) {
- while (logBufferIdle == null && !deleted) {
- switchBufferCondition.wait();
+ while (logBufferFlushing != null && !deleted.get()) {
+ switchBufferCondition.wait(100);
}
+ logBufferFlushing = logBufferWorking;
logBufferWorking = logBufferIdle;
+ logBufferWorking.clear();
logBufferIdle = null;
- switchBufferCondition.notifyAll();
- }
- }
-
- private void switchBufferFlushingToIdle() throws InterruptedException {
- synchronized (switchBufferCondition) {
- while (logBufferIdle != null && !deleted) {
- switchBufferCondition.wait();
- }
- logBufferIdle = logBufferFlushing;
- logBufferIdle.clear();
- logBufferFlushing = null;
- switchBufferCondition.notifyAll();
}
}