You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/26 06:14:56 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: remove
lock in PrimitiveDataListPool
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new f4b2aee remove lock in PrimitiveDataListPool
new 543a438 Merge remote-tracking branch 'origin/feature_async_close_tsfile' into feature_async_close_tsfile
f4b2aee is described below
commit f4b2aee20f6a6d713f210f87c16d25d5ec388a45
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jun 26 11:40:07 2019 +0800
remove lock in PrimitiveDataListPool
---
.../filenodeV2/UnsealedTsFileProcessorV2.java | 13 +++-
.../db/engine/memtable/MemTableFlushTaskV2.java | 14 +---
.../iotdb/db/qp/physical/crud/InsertPlan.java | 9 +++
.../iotdb/db/utils/PrimitiveDataListPool.java | 32 +++++-----
.../db/writelog/node/ExclusiveWriteLogNode.java | 74 ++++++++++++++++------
5 files changed, 92 insertions(+), 50 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 087d8a2..9708dc3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -258,7 +258,9 @@ public class UnsealedTsFileProcessorV2 {
if (workMemTable == null) {
return;
}
- logNode.notifyStartFlush();
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+ logNode.notifyStartFlush();
+ }
flushingMemTables.addLast(workMemTable);
workMemTable.setVersion(versionController.nextVersion());
FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
@@ -303,8 +305,15 @@ public class UnsealedTsFileProcessorV2 {
MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, fileSchema, writer, storageGroupName,
this::releaseFlushedMemTableCallback);
flushTask.flushMemTable();
+ long start = System.currentTimeMillis();
MemTablePool.getInstance().putBack(memTableToFlush, storageGroupName);
- logNode.notifyEndFlush();
+ long elapse = System.currentTimeMillis() - start;
+ if (elapse > 1000) {
+ LOGGER.info("release a memtable cost: {}", elapse);
+ }
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+ logNode.notifyEndFlush();
+ }
LOGGER.info("flush a memtable has finished");
} else {
LOGGER.info(
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index 3a7a994..5502e30 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -128,12 +128,8 @@ public class MemTableFlushTaskV2 {
}
} else {
if (task instanceof String) {
-// LOGGER.info("Storage group {} memtable {}, issues a start flush chunk group task.",
-// storageGroup, memTable.getVersion());
ioTaskQueue.add(task);
} else if (task instanceof ChunkGroupIoTask) {
-// LOGGER.info("Storage group {} memtable {}, issues a end flush chunk group task.",
-// storageGroup, memTable.getVersion());
ioTaskQueue.add(task);
} else {
long starTime = System.currentTimeMillis();
@@ -150,8 +146,6 @@ public class MemTableFlushTaskV2 {
memTable.getVersion(), e);
throw new RuntimeException(e);
}
-// LOGGER.info("Storage group {} memtable {}, issues a write chunk task.",
-// storageGroup, memTable.getVersion());
memSerializeTime += System.currentTimeMillis() - starTime;
}
}
@@ -195,16 +189,10 @@ public class MemTableFlushTaskV2 {
long starTime = System.currentTimeMillis();
try {
if (ioTask instanceof String) {
-// LOGGER.info("Storage group {} memtable {}, start a chunk group.", storageGroup,
-// memTable.getVersion());
tsFileIoWriter.startChunkGroup((String) ioTask);
} else if (ioTask instanceof IChunkWriter) {
-// LOGGER.info("Storage group {} memtable {}, writing a series to file.", storageGroup,
-// memTable.getVersion());
((IChunkWriter) ioTask).writeToFileWriter(tsFileIoWriter);
} else {
-// LOGGER.info("Storage group {} memtable {}, end a chunk group.", storageGroup,
-// memTable.getVersion());
ChunkGroupIoTask endGroupTask = (ChunkGroupIoTask) ioTask;
tsFileIoWriter.endChunkGroup(endGroupTask.version);
endGroupTask.finished = true;
@@ -217,7 +205,7 @@ public class MemTableFlushTaskV2 {
ioTime += System.currentTimeMillis() - starTime;
}
}
- LOGGER.info("flushing a memtable {} in storage group {}, cost {}ms", memTable.getVersion(),
+ LOGGER.info("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
storageGroup, ioTime);
} catch (RuntimeException e) {
LOGGER.error("ioflush thread is dead", e);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index b017e3c..f98541e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -31,9 +31,13 @@ import org.apache.iotdb.db.qp.physical.transfer.SystemLogOperator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class InsertPlan extends PhysicalPlan {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalPlan.class);
+
private String deviceId;
private String[] measurements;
private TSDataType[] dataTypes;
@@ -163,6 +167,7 @@ public class InsertPlan extends PhysicalPlan {
@Override
public void serializeTo(ByteBuffer buffer) {
+ long startTime = System.currentTimeMillis();
int type = SystemLogOperator.INSERT;
buffer.put((byte) type);
buffer.put((byte) insertType);
@@ -179,6 +184,10 @@ public class InsertPlan extends PhysicalPlan {
for (String m : values) {
putString(buffer, m);
}
+ long elapsed = System.currentTimeMillis() - startTime;
+ if (elapsed > 1000) {
+ LOGGER.info("Serialize an insertion costs {}ms", elapsed);
+ }
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveDataListPool.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveDataListPool.java
index 1454857..08a3e4e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveDataListPool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveDataListPool.java
@@ -18,9 +18,9 @@
*/
package org.apache.iotdb.db.utils;
-import java.util.HashMap;
-import java.util.LinkedList;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -30,21 +30,21 @@ import org.apache.iotdb.tsfile.utils.Binary;
*/
public class PrimitiveDataListPool {
- private static final Map<Class, LinkedList<PrimitiveArrayListV2>> primitiveArrayListsMap = new HashMap<>();
+ private static final Map<Class, ConcurrentLinkedQueue<PrimitiveArrayListV2>> primitiveArrayListsMap = new ConcurrentHashMap<>();
static {
- primitiveArrayListsMap.put(boolean.class, new LinkedList<>());
- primitiveArrayListsMap.put(int.class, new LinkedList<>());
- primitiveArrayListsMap.put(long.class, new LinkedList<>());
- primitiveArrayListsMap.put(float.class, new LinkedList<>());
- primitiveArrayListsMap.put(double.class, new LinkedList<>());
- primitiveArrayListsMap.put(Binary.class, new LinkedList<>());
+ primitiveArrayListsMap.put(boolean.class, new ConcurrentLinkedQueue<>());
+ primitiveArrayListsMap.put(int.class, new ConcurrentLinkedQueue<>());
+ primitiveArrayListsMap.put(long.class, new ConcurrentLinkedQueue<>());
+ primitiveArrayListsMap.put(float.class, new ConcurrentLinkedQueue<>());
+ primitiveArrayListsMap.put(double.class, new ConcurrentLinkedQueue<>());
+ primitiveArrayListsMap.put(Binary.class, new ConcurrentLinkedQueue<>());
}
private PrimitiveDataListPool() {
}
- public synchronized PrimitiveArrayListV2 getPrimitiveDataListByDataType(TSDataType dataType) {
+ public PrimitiveArrayListV2 getPrimitiveDataListByDataType(TSDataType dataType) {
switch (dataType) {
case BOOLEAN:
return getPrimitiveDataList(boolean.class);
@@ -64,14 +64,14 @@ public class PrimitiveDataListPool {
}
private PrimitiveArrayListV2 getPrimitiveDataList(Class clazz) {
- LinkedList<PrimitiveArrayListV2> primitiveArrayList = primitiveArrayListsMap.get(clazz);
- return primitiveArrayList.isEmpty() ? new PrimitiveArrayListV2(clazz)
- : primitiveArrayList.pollFirst();
+ ConcurrentLinkedQueue<PrimitiveArrayListV2> primitiveArrayList = primitiveArrayListsMap.get(clazz);
+ PrimitiveArrayListV2 dataList = primitiveArrayList.poll();
+ return dataList == null ? new PrimitiveArrayListV2(clazz) : dataList;
}
- public synchronized void release(PrimitiveArrayListV2 primitiveArrayList) {
+ public void release(PrimitiveArrayListV2 primitiveArrayList) {
primitiveArrayList.reset();
- primitiveArrayListsMap.get(primitiveArrayList.getClazz()).addLast(primitiveArrayList);
+ primitiveArrayListsMap.get(primitiveArrayList.getClazz()).offer(primitiveArrayList);
}
public static PrimitiveDataListPool getInstance() {
@@ -86,7 +86,7 @@ public class PrimitiveDataListPool {
private static final PrimitiveDataListPool INSTANCE = new PrimitiveDataListPool();
}
- public synchronized int getPrimitiveDataListSizeByDataType(TSDataType dataType){
+ public int getPrimitiveDataListSizeByDataType(TSDataType dataType){
switch (dataType) {
case BOOLEAN:
return primitiveArrayListsMap.get(boolean.class).size();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 03158bb..b9b5a82 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<ExclusiveWriteLogNode> {
public static final String WAL_FILE_NAME = "wal";
- private static final Logger logger = LoggerFactory.getLogger(ExclusiveWriteLogNode.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ExclusiveWriteLogNode.class);
private static int logBufferSize = IoTDBDescriptor.getInstance().getConfig().getWalBufferSize();
private String identifier;
@@ -74,7 +74,12 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
@Override
public void write(PhysicalPlan plan) throws IOException {
+ long lockStartTime = System.currentTimeMillis();
lock.writeLock().lock();
+ long lockElapsed = System.currentTimeMillis() - lockStartTime;
+ if (lockElapsed > 1000) {
+ LOGGER.info("WAL waiting for lock costs {} ms", lockElapsed);
+ }
try {
long start = System.currentTimeMillis();
@@ -85,7 +90,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
}
long elapse = System.currentTimeMillis() - start;
if (elapse > 1000) {
- logger.info("WAL insert cost {} ms", elapse);
+ LOGGER.info("WAL insert cost {} ms", elapse);
}
} catch (BufferOverflowException e) {
throw new IOException("Log cannot fit into buffer", e);
@@ -99,6 +104,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
try {
plan.serializeTo(logBuffer);
} catch (BufferOverflowException e) {
+ LOGGER.info("WAL BufferOverflow !");
logBuffer.reset();
sync();
plan.serializeTo(logBuffer);
@@ -110,22 +116,27 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
public void close() {
sync();
forceWal();
+ long lockStartTime = System.currentTimeMillis();
lock.writeLock().lock();
+ long lockElapsed = System.currentTimeMillis() - lockStartTime;
+ if (lockElapsed > 1000) {
+ LOGGER.info("WAL waiting for lock costs {} ms", lockElapsed);
+ }
long start = System.currentTimeMillis();
try {
if (this.currentFileWriter != null) {
this.currentFileWriter.close();
this.currentFileWriter = null;
}
- logger.debug("Log node {} closed successfully", identifier);
+ LOGGER.debug("Log node {} closed successfully", identifier);
} catch (IOException e) {
- logger.error("Cannot close log node {} because:", identifier, e);
+ LOGGER.error("Cannot close log node {} because:", identifier, e);
} finally {
lock.writeLock().unlock();
}
long elapse = System.currentTimeMillis() - start;
if (elapse > 1000) {
- logger.info("WAL log node {} close cost {} ms", identifier, elapse);
+ LOGGER.info("WAL log node {} close cost {} ms", identifier, elapse);
}
}
@@ -138,14 +149,19 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
@Override
public void notifyStartFlush() {
+ long lockStartTime = System.currentTimeMillis();
lock.writeLock().lock();
+ long lockElapsed = System.currentTimeMillis() - lockStartTime;
+ if (lockElapsed > 1000) {
+ LOGGER.info("WAL waiting for lock costs {} ms", lockElapsed);
+ }
try {
long start = System.currentTimeMillis();
close();
nextFileWriter();
long elapse = System.currentTimeMillis() - start;
if (elapse > 1000) {
- logger.info("WAL notifyStartFlush cost {} ms", elapse);
+ LOGGER.info("WAL notifyStartFlush cost {} ms", elapse);
}
} finally {
lock.writeLock().unlock();
@@ -154,14 +170,19 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
@Override
public void notifyEndFlush() {
+ long lockStartTime = System.currentTimeMillis();
lock.writeLock().lock();
+ long lockElapsed = System.currentTimeMillis() - lockStartTime;
+ if (lockElapsed > 1000) {
+ LOGGER.info("WAL waiting for lock costs {} ms", lockElapsed);
+ }
try {
long start = System.currentTimeMillis();
File logFile = new File(logDirectory, WAL_FILE_NAME + ++lastFlushedId);
discard(logFile);
long elapse = System.currentTimeMillis() - start;
if (elapse > 1000) {
- logger.info("WAL notifyEndFlush cost {} ms", elapse);
+ LOGGER.info("WAL notifyEndFlush cost {} ms", elapse);
}
} finally {
lock.writeLock().unlock();
@@ -180,7 +201,12 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
@Override
public void delete() throws IOException {
+ long lockStartTime = System.currentTimeMillis();
lock.writeLock().lock();
+ long lockElapsed = System.currentTimeMillis() - lockStartTime;
+ if (lockElapsed > 1000) {
+ LOGGER.info("WAL waiting for lock costs {} ms", lockElapsed);
+ }
try {
long start = System.currentTimeMillis();
logBuffer.clear();
@@ -188,7 +214,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
FileUtils.deleteDirectory(new File(logDirectory));
long elapse = System.currentTimeMillis() - start;
if (elapse > 1000) {
- logger.info("WAL delete cost {} ms", elapse);
+ LOGGER.info("WAL delete cost {} ms", elapse);
}
} finally {
lock.writeLock().unlock();
@@ -205,33 +231,38 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
private void discard(File logFile) {
if (!logFile.exists()) {
- logger.info("Log file does not exist");
+ LOGGER.info("Log file does not exist");
} else {
try {
FileUtils.forceDelete(logFile);
- logger.info("Log node {} cleaned old file", identifier);
+ LOGGER.info("Log node {} cleaned old file", identifier);
} catch (IOException e) {
- logger.error("Old log file {} of {} cannot be deleted", logFile.getName(), identifier, e);
+ LOGGER.error("Old log file {} of {} cannot be deleted", logFile.getName(), identifier, e);
}
}
}
private void forceWal() {
+ long lockStartTime = System.currentTimeMillis();
lock.writeLock().lock();
+ long lockElapsed = System.currentTimeMillis() - lockStartTime;
+ if (lockElapsed > 1000) {
+ LOGGER.info("WAL waiting for lock costs {} ms", lockElapsed);
+ }
try {
long start = System.currentTimeMillis();
- logger.debug("Log node {} starts force, {} logs to be forced", identifier, bufferedLogNum);
+ LOGGER.debug("Log node {} starts force, {} logs to be forced", identifier, bufferedLogNum);
try {
if (currentFileWriter != null) {
currentFileWriter.force();
}
} catch (IOException e) {
- logger.error("Log node {} force failed.", identifier, e);
+ LOGGER.error("Log node {} force failed.", identifier, e);
}
- logger.debug("Log node {} ends force.", identifier);
+ LOGGER.debug("Log node {} ends force.", identifier);
long elapse = System.currentTimeMillis() - start;
if (elapse > 1000) {
- logger.info("WAL forceWal cost {} ms", elapse);
+ LOGGER.info("WAL forceWal cost {} ms", elapse);
}
} finally {
lock.writeLock().unlock();
@@ -239,25 +270,30 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
}
private void sync() {
+ long lockStartTime = System.currentTimeMillis();
lock.writeLock().lock();
+ long lockElapsed = System.currentTimeMillis() - lockStartTime;
+ if (lockElapsed > 1000) {
+ LOGGER.info("WAL waiting for lock costs {} ms", lockElapsed);
+ }
try {
long start = System.currentTimeMillis();
- logger.debug("Log node {} starts sync, {} logs to be synced", identifier, bufferedLogNum);
+ LOGGER.debug("Log node {} starts sync, {} logs to be synced", identifier, bufferedLogNum);
if (bufferedLogNum == 0) {
return;
}
try {
getCurrentFileWriter().write(logBuffer);
} catch (IOException e) {
- logger.error("Log node {} sync failed", identifier, e);
+ LOGGER.error("Log node {} sync failed", identifier, e);
}
logBuffer.clear();
bufferedLogNum = 0;
- logger.debug("Log node {} ends sync.", identifier);
+ LOGGER.debug("Log node {} ends sync.", identifier);
long elapse = System.currentTimeMillis() - start;
if (elapse > 1000) {
- logger.info("WAL sync cost {} ms", elapse);
+ LOGGER.info("WAL sync cost {} ms", elapse);
}
} finally {
lock.writeLock().unlock();