You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/02/06 01:49:40 UTC
[iotdb] branch rel/0.11 updated: Add log for better tracing (#2637)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 8c0d246 Add log for better tracing (#2637)
8c0d246 is described below
commit 8c0d246d1f992c0f1814257b152a8eb90a709c18
Author: SilverNarcissus <15...@smail.nju.edu.cn>
AuthorDate: Fri Feb 5 19:49:25 2021 -0600
Add log for better tracing (#2637)
* add wal log
* fix issue
* fix bug
* add log in memtable flush and format code
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 151 ++++++++++-----------
.../iotdb/db/writelog/io/SingleFileLogReader.java | 12 +-
2 files changed, 82 insertions(+), 81 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index ce08298..6491db4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -47,82 +47,14 @@ public class MemTableFlushTask {
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final Future<?> encodingTaskFuture;
private final Future<?> ioTaskFuture;
- private RestorableTsFileIOWriter writer;
-
private final LinkedBlockingQueue<Object> ioTaskQueue =
new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing());
- private final LinkedBlockingQueue<Object> encodingTaskQueue =
+ private final LinkedBlockingQueue<Object> encodingTaskQueue =
new LinkedBlockingQueue<>(config.getEncodingTaskQueueSizeForFlushing());
+ private RestorableTsFileIOWriter writer;
private String storageGroup;
private IMemTable memTable;
-
-
- /**
- * @param memTable the memTable to flush
- * @param writer the writer where memTable will be flushed to (current tsfile writer or vm writer)
- * @param storageGroup current storage group
- */
-
- public MemTableFlushTask(IMemTable memTable, RestorableTsFileIOWriter writer, String storageGroup) {
- this.memTable = memTable;
- this.writer = writer;
- this.storageGroup = storageGroup;
- this.encodingTaskFuture = subTaskPoolManager.submit(encodingTask);
- this.ioTaskFuture = subTaskPoolManager.submit(ioTask);
- logger.debug("flush task of Storage group {} memtable {} is created ",
- storageGroup, memTable.getVersion());
- }
-
- /**
- * the function for flushing memtable.
- */
- public void syncFlushMemTable()
- throws ExecutionException, InterruptedException, IOException {
- logger.info("The memTable size of SG {} is {}, the avg series points num in chunk is {} ",
- storageGroup,
- memTable.memSize(),
- memTable.getTotalPointsNum() / memTable.getSeriesNumber());
- long start = System.currentTimeMillis();
- long sortTime = 0;
-
- for (String deviceId : memTable.getMemTableMap().keySet()) {
- encodingTaskQueue.put(new StartFlushGroupIOTask(deviceId));
- for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {
- long startTime = System.currentTimeMillis();
- IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
- MeasurementSchema desc = series.getSchema();
- TVList tvList = series.getSortedTVList();
- sortTime += System.currentTimeMillis() - startTime;
- encodingTaskQueue.put(new Pair<>(tvList, desc));
- }
- encodingTaskQueue.put(new EndChunkGroupIoTask());
- }
- encodingTaskQueue.put(new TaskEnd());
- logger.debug(
- "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
- storageGroup, memTable.getVersion(), sortTime);
-
- try {
- encodingTaskFuture.get();
- } catch (InterruptedException | ExecutionException e) {
- ioTaskFuture.cancel(true);
- throw e;
- }
-
- ioTaskFuture.get();
-
- try {
- writer.writeVersion(memTable.getVersion());
- } catch (IOException e) {
- throw new ExecutionException(e);
- }
-
- logger.info(
- "Storage group {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
- storageGroup, memTable, System.currentTimeMillis() - start);
- }
-
private Runnable encodingTask = new Runnable() {
private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
TSDataType dataType) {
@@ -214,7 +146,6 @@ public class MemTableFlushTask {
storageGroup, memTable.getVersion(), memSerializeTime);
}
};
-
@SuppressWarnings("squid:S135")
private Runnable ioTask = () -> {
long ioTime = 0;
@@ -234,8 +165,7 @@ public class MemTableFlushTask {
this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
} else if (ioMessage instanceof TaskEnd) {
break;
- }
- else if (ioMessage instanceof IChunkWriter) {
+ } else if (ioMessage instanceof IChunkWriter) {
ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
chunkWriter.writeToFileWriter(this.writer);
} else {
@@ -251,11 +181,80 @@ public class MemTableFlushTask {
logger.debug("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
storageGroup, ioTime);
};
-
+
+ /**
+ * @param memTable the memTable to flush
+ * @param writer the writer where memTable will be flushed to (current tsfile writer or vm
+ * writer)
+ * @param storageGroup current storage group
+ */
+
+ public MemTableFlushTask(IMemTable memTable, RestorableTsFileIOWriter writer,
+ String storageGroup) {
+ this.memTable = memTable;
+ this.writer = writer;
+ this.storageGroup = storageGroup;
+ this.encodingTaskFuture = subTaskPoolManager.submit(encodingTask);
+ this.ioTaskFuture = subTaskPoolManager.submit(ioTask);
+ logger.debug("flush task of Storage group {} memtable {} is created ",
+ storageGroup, memTable.getVersion());
+ }
+
+ /**
+ * the function for flushing memtable.
+ */
+ public void syncFlushMemTable()
+ throws ExecutionException, InterruptedException, IOException {
+ logger.info(
+ "The memTable size of SG {} is {}, the avg series points num in chunk is {}, total timeseries number is {}",
+ storageGroup,
+ memTable.memSize(),
+ memTable.getTotalPointsNum() / memTable.getSeriesNumber(),
+ memTable.getSeriesNumber());
+ long start = System.currentTimeMillis();
+ long sortTime = 0;
+
+ for (String deviceId : memTable.getMemTableMap().keySet()) {
+ encodingTaskQueue.put(new StartFlushGroupIOTask(deviceId));
+ for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {
+ long startTime = System.currentTimeMillis();
+ IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
+ MeasurementSchema desc = series.getSchema();
+ TVList tvList = series.getSortedTVList();
+ sortTime += System.currentTimeMillis() - startTime;
+ encodingTaskQueue.put(new Pair<>(tvList, desc));
+ }
+ encodingTaskQueue.put(new EndChunkGroupIoTask());
+ }
+ encodingTaskQueue.put(new TaskEnd());
+ logger.debug(
+ "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
+ storageGroup, memTable.getVersion(), sortTime);
+
+ try {
+ encodingTaskFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ ioTaskFuture.cancel(true);
+ throw e;
+ }
+
+ ioTaskFuture.get();
+
+ try {
+ writer.writeVersion(memTable.getVersion());
+ } catch (IOException e) {
+ throw new ExecutionException(e);
+ }
+
+ logger.info(
+ "Storage group {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
+ storageGroup, memTable, System.currentTimeMillis() - start);
+ }
+
static class TaskEnd {
-
+
TaskEnd() {
-
+
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java b/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
index d504cbc..0e1a611 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
@@ -37,9 +37,8 @@ import org.slf4j.LoggerFactory;
*/
public class SingleFileLogReader implements ILogReader {
- private static final Logger logger = LoggerFactory.getLogger(SingleFileLogReader.class);
public static final int LEAST_LOG_SIZE = 12; // size + checksum
-
+ private static final Logger logger = LoggerFactory.getLogger(SingleFileLogReader.class);
private DataInputStream logStream;
private String filepath;
@@ -91,7 +90,9 @@ public class SingleFileLogReader implements ILogReader {
batchLogReader = new BatchLogReader(ByteBuffer.wrap(buffer));
fileCorrupted = fileCorrupted || batchLogReader.isFileCorrupted();
} catch (Exception e) {
- logger.error("Cannot read more PhysicalPlans from {} because", filepath, e);
+ logger.error(
+ "Cannot read more PhysicalPlans from {}, successfully read index is {}. The reason is",
+ idx, filepath, e);
fileCorrupted = true;
return false;
}
@@ -100,11 +101,11 @@ public class SingleFileLogReader implements ILogReader {
@Override
public PhysicalPlan next() {
- if (!hasNext()){
+ if (!hasNext()) {
throw new NoSuchElementException();
}
- idx ++;
+ idx++;
return batchLogReader.next();
}
@@ -122,6 +123,7 @@ public class SingleFileLogReader implements ILogReader {
public void open(File logFile) throws FileNotFoundException {
close();
logStream = new DataInputStream(new BufferedInputStream(new FileInputStream(logFile)));
+ logger.info("WAL file: {} size is {}", logFile.getName(), logFile.length());
this.filepath = logFile.getPath();
idx = 0;
}