You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/11/10 10:16:27 UTC
[iotdb] 01/01: split wal buffer into 2 to reduce waiting time
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch split_wal_buffer
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ec4440c03babf49179535dcbb5883d9dbfc46761
Author: jt <jt...@163.com>
AuthorDate: Tue Nov 10 18:15:22 2020 +0800
split wal buffer into 2 to reduce waiting time
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 2 +-
.../db/engine/storagegroup/TsFileProcessor.java | 1 +
.../db/exception/TsFileProcessorException.java | 2 +-
.../org/apache/iotdb/db/utils/SerializeUtils.java | 6 +-
.../org/apache/iotdb/db/writelog/io/LogWriter.java | 9 +-
.../db/writelog/node/ExclusiveWriteLogNode.java | 155 ++++++++++++++++-----
.../apache/iotdb/db/writelog/WriteLogNodeTest.java | 11 +-
7 files changed, 145 insertions(+), 41 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index e9f4c92..1962cfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -222,7 +222,7 @@ public class MemTableFlushTask {
TimeUnit.MILLISECONDS.sleep(10);
} catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
logger.error("Storage group {} memtable {}, io task is interrupted.", storageGroup
- , memTable.getVersion(), e);
+ , memTable.getVersion());
// generally it is because the thread pool is shutdown so the task should be aborted
break;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index b810774..c05bb25 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -631,6 +631,7 @@ public class TsFileProcessor {
storageGroupName, tsFileResource.getTsFile().getName(), tobeFlushed.getMemTableMap());
return;
}
+ logger.warn("Before flush listeners are called");
for (FlushListener flushListener : flushListeners) {
flushListener.onFlushStart(tobeFlushed);
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java b/server/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
index 5ff1e3b..4c7e72c 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
@@ -29,6 +29,6 @@ public class TsFileProcessorException extends IoTDBException {
}
public TsFileProcessorException(Exception exception) {
- super(exception.getMessage(), TSStatusCode.TSFILE_PROCESSOR_ERROR.getStatusCode());
+ super(exception, TSStatusCode.TSFILE_PROCESSOR_ERROR.getStatusCode());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
index ea6234d..0150f44 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.In;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -509,11 +510,14 @@ public class SerializeUtils {
int idLastPos = str.indexOf(',', idFirstPos);
int dataPortFirstPos = str.indexOf("dataPort:", idLastPos) + "dataPort:".length();
int dataPortLastPos = str.indexOf(')', dataPortFirstPos);
+ int clientPortFirstPos = str.indexOf("clientPort:", dataPortLastPos) + "clientPort:".length();
+ int clientPortLastPos = str.indexOf(')', clientPortFirstPos);
String ip = str.substring(ipFirstPos, ipLastPos);
int metaPort = Integer.parseInt(str.substring(metaPortFirstPos, metaPortLastPos));
int id = Integer.parseInt(str.substring(idFirstPos, idLastPos));
int dataPort = Integer.parseInt(str.substring(dataPortFirstPos, dataPortLastPos));
- return new Node(ip, metaPort, id, dataPort);
+ int clientPort = Integer.parseInt(str.substring(clientPortFirstPos, clientPortLastPos));
+ return new Node(ip, metaPort, id, dataPort, clientPort);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java b/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
index c270a16..0ba9f74 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
@@ -84,7 +84,7 @@ public class LogWriter implements ILogWriter {
@Override
public void force() throws IOException {
- if (channel != null) {
+ if (channel != null && channel.isOpen()) {
channel.force(true);
}
}
@@ -98,4 +98,11 @@ public class LogWriter implements ILogWriter {
channel = null;
}
}
+
+ @Override
+ public String toString() {
+ return "LogWriter{" +
+ "logFile=" + logFile +
+ '}';
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 680ac65..f37da85 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -18,14 +18,18 @@
*/
package org.apache.iotdb.db.writelog.node;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.Comparator;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -55,16 +59,25 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private ByteBuffer logBuffer = ByteBuffer
- .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize());
+ private ByteBuffer logBufferWorking = ByteBuffer
+ .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ private ByteBuffer logBufferIdle = ByteBuffer
+ .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ private ByteBuffer logBufferFlushing;
- private ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Object switchBufferCondition = new Object();
+ private ReentrantLock lock = new ReentrantLock();
+ private static final ExecutorService FLUSH_BUFFER_THREAD_POOL =
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat("Flush-WAL-Thread-%d").setDaemon(true).build());
private long fileId = 0;
private long lastFlushedId = 0;
private int bufferedLogNum = 0;
+ private boolean deleted;
+
/**
* constructor of ExclusiveWriteLogNode.
*
@@ -75,13 +88,16 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
this.logDirectory =
DirectoryManager.getInstance().getWALFolder() + File.separator + this.identifier;
if (SystemFileFactory.INSTANCE.getFile(logDirectory).mkdirs()) {
- logger.info("create the WAL folder {}." + logDirectory);
+ logger.info("create the WAL folder {}", logDirectory);
}
}
@Override
public void write(PhysicalPlan plan) throws IOException {
- lock.writeLock().lock();
+ if (deleted) {
+ throw new IOException("WAL node deleted");
+ }
+ lock.lock();
try {
putLog(plan);
if (bufferedLogNum >= config.getFlushWalThreshold()) {
@@ -91,43 +107,57 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
throw new IOException(
"Log cannot fit into the buffer, please increase wal_buffer_size", e);
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
}
private void putLog(PhysicalPlan plan) {
- logBuffer.mark();
+ logBufferWorking.mark();
try {
- plan.serialize(logBuffer);
+ plan.serialize(logBufferWorking);
} catch (BufferOverflowException e) {
logger.info("WAL BufferOverflow !");
- logBuffer.reset();
+ logBufferWorking.reset();
sync();
- plan.serialize(logBuffer);
+ plan.serialize(logBufferWorking);
}
- bufferedLogNum ++;
+ bufferedLogNum++;
}
@Override
public void close() {
sync();
forceWal();
- lock.writeLock().lock();
+ lock.lock();
try {
+ synchronized (switchBufferCondition) {
+ while (logBufferFlushing != null && !deleted) {
+ switchBufferCondition.wait();
+ }
+ switchBufferCondition.notifyAll();
+ }
+
if (this.currentFileWriter != null) {
this.currentFileWriter.close();
+ logger.warn("WAL file {} is closed", currentFileWriter);
this.currentFileWriter = null;
}
logger.debug("Log node {} closed successfully", identifier);
} catch (IOException e) {
logger.error("Cannot close log node {} because:", identifier, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("Waiting for current buffer being flushed interrupted");
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
}
@Override
public void forceSync() {
+ if (deleted) {
+ return;
+ }
sync();
forceWal();
}
@@ -135,23 +165,24 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
@Override
public void notifyStartFlush() {
- lock.writeLock().lock();
+ lock.lock();
try {
close();
nextFileWriter();
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
}
@Override
public void notifyEndFlush() {
- lock.writeLock().lock();
+ lock.lock();
try {
- File logFile = SystemFileFactory.INSTANCE.getFile(logDirectory, WAL_FILE_NAME + ++lastFlushedId);
+ File logFile = SystemFileFactory.INSTANCE
+ .getFile(logDirectory, WAL_FILE_NAME + ++lastFlushedId);
discard(logFile);
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
}
@@ -167,13 +198,13 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
@Override
public void delete() throws IOException {
- lock.writeLock().lock();
+ lock.lock();
try {
- logBuffer.clear();
close();
FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(logDirectory));
+ deleted = true;
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
}
@@ -199,7 +230,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
}
private void forceWal() {
- lock.writeLock().lock();
+ lock.lock();
try {
try {
if (currentFileWriter != null) {
@@ -209,28 +240,81 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
logger.error("Log node {} force failed.", identifier, e);
}
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
}
private void sync() {
- lock.writeLock().lock();
+ lock.lock();
try {
if (bufferedLogNum == 0) {
return;
}
- try {
- getCurrentFileWriter().write(logBuffer);
- } catch (IOException e) {
- logger.error("Log node {} sync failed, change system mode to read-only", identifier, e);
- IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
- return;
- }
- logBuffer.clear();
+ switchBufferWorkingToFlushing();
+ ILogWriter currWriter = getCurrentFileWriter();
+ FLUSH_BUFFER_THREAD_POOL.submit(() -> flushBuffer(currWriter));
+ switchBufferIdleToWorking();
+
bufferedLogNum = 0;
logger.debug("Log node {} ends sync.", identifier);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("Waiting for available buffer interrupted");
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
+ }
+ }
+
+ private void flushBuffer(ILogWriter writer) {
+ try {
+ writer.write(logBufferFlushing);
+ } catch (ClosedChannelException e) {
+ // ignore
+ } catch (IOException e) {
+ logger.error("Log node {} sync failed, change system mode to read-only", identifier, e);
+ IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+ return;
+ }
+ logBufferFlushing.clear();
+
+ try {
+ switchBufferFlushingToIdle();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void switchBufferWorkingToFlushing() throws InterruptedException {
+ synchronized (switchBufferCondition) {
+ while (logBufferFlushing != null && !deleted) {
+ switchBufferCondition.wait();
+ }
+ logBufferFlushing = logBufferWorking;
+ logBufferWorking = null;
+ switchBufferCondition.notifyAll();
+ }
+ }
+
+ private void switchBufferIdleToWorking() throws InterruptedException {
+ synchronized (switchBufferCondition) {
+ while (logBufferIdle == null && !deleted) {
+ switchBufferCondition.wait();
+ }
+ logBufferWorking = logBufferIdle;
+ logBufferIdle = null;
+ switchBufferCondition.notifyAll();
+ }
+ }
+
+ private void switchBufferFlushingToIdle() throws InterruptedException {
+ synchronized (switchBufferCondition) {
+ while (logBufferIdle != null && !deleted) {
+ switchBufferCondition.wait();
+ }
+ logBufferIdle = logBufferFlushing;
+ logBufferIdle.clear();
+ logBufferFlushing = null;
+ switchBufferCondition.notifyAll();
}
}
@@ -247,6 +331,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
if (newFile.getParentFile().mkdirs()) {
logger.info("create WAL parent folder {}.", newFile.getParent());
}
+ logger.warn("WAL file {} is opened", newFile);
currentFileWriter = new LogWriter(newFile);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index e779e2d..fe5610c 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -180,9 +180,13 @@ public class WriteLogNodeTest {
File walFile = new File(
config.getWalDir() + File.separator + "root.logTestDevice" + File.separator + "wal1");
- assertTrue(!walFile.exists());
+ assertFalse(walFile.exists());
logNode.write(deletePlan);
+ System.out.println("Waiting for wal file to be created");
+ while (!walFile.exists()) {
+
+ }
assertTrue(walFile.exists());
logNode.delete();
@@ -209,7 +213,10 @@ public class WriteLogNodeTest {
File walFile = new File(
config.getWalDir() + File.separator + "root.logTestDevice" + File.separator + "wal1");
- assertTrue(walFile.exists());
+ System.out.println("Waiting for wal to be created");
+ while (!walFile.exists()) {
+
+ }
assertTrue(new File(logNode.getLogDirectory()).exists());
logNode.delete();