You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/28 09:41:09 UTC
[incubator-iotdb] 01/02: fix a bug of don't update endTimeMap when
normal close UFP
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit faa9e8ec4a99217b4d5550bd15aa3598c4212267
Author: lta <li...@163.com>
AuthorDate: Fri Jun 28 17:23:24 2019 +0800
fix a bug of don't update endTimeMap when normal close UFP
---
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 14 ++++++-----
.../db/engine/filenodeV2/TsFileResourceV2.java | 27 ++++++++++++----------
.../filenodeV2/UnsealedTsFileProcessorV2.java | 24 ++++++++++++-------
.../engine/filenodeV2/FileNodeProcessorV2Test.java | 2 +-
.../filenodeV2/UnsealedTsFileProcessorV2Test.java | 9 ++++----
5 files changed, 45 insertions(+), 31 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index a98a140..0c64be3 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -288,10 +287,11 @@ public class FileNodeProcessorV2 {
// insert BufferWrite
long start2 = System.currentTimeMillis();
- result = unsealedTsFileProcessor.insert(insertPlan);
+ result = unsealedTsFileProcessor.insert(insertPlan, sequence);
start2 = System.currentTimeMillis() - start2;
if (start2 > 1000) {
- LOGGER.info("FNP {} insert a record into unsealed file processor cost: {}", storageGroupName, start2);
+ LOGGER.info("FNP {} insert a record into unsealed file processor cost: {}", storageGroupName,
+ start2);
}
// try to update the latest time of the device of this tsRecord
@@ -355,6 +355,7 @@ public class FileNodeProcessorV2 {
if (sequence) {
closingSequenceTsFileProcessor.add(unsealedTsFileProcessor);
workSequenceTsFileProcessor = null;
+ updateEndTimeMap(unsealedTsFileProcessor);
} else {
closingUnSequenceTsFileProcessor.add(unsealedTsFileProcessor);
workUnSequenceTsFileProcessor = null;
@@ -389,7 +390,8 @@ public class FileNodeProcessorV2 {
lock.writeLock().lock();
time = System.currentTimeMillis() - time;
if (time > 1000) {
- LOGGER.info("storage group {} wait for write lock cost: {}", storageGroupName, time, new RuntimeException());
+ LOGGER.info("storage group {} wait for write lock cost: {}", storageGroupName, time,
+ new RuntimeException());
}
timerr.set(System.currentTimeMillis());
}
@@ -398,7 +400,8 @@ public class FileNodeProcessorV2 {
lock.writeLock().unlock();
long time = System.currentTimeMillis() - timerr.get();
if (time > 1000) {
- LOGGER.info("storage group {} take lock for {}ms", storageGroupName, time, new RuntimeException());
+ LOGGER.info("storage group {} take lock for {}ms", storageGroupName, time,
+ new RuntimeException());
}
}
@@ -535,7 +538,6 @@ public class FileNodeProcessorV2 {
if (workUnSequenceTsFileProcessor != null) {
closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor);
workUnSequenceTsFileProcessor.asyncClose();
- updateEndTimeMap(workUnSequenceTsFileProcessor);
workUnSequenceTsFileProcessor = null;
}
} finally {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index a60ccf5..d226517 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -22,18 +22,14 @@ import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -51,8 +47,7 @@ public class TsFileResourceV2 {
private Map<String, Long> startTimeMap;
/**
- * device -> end time
- * null if it's an unsealed tsfile
+ * device -> end time null if it's an unsealed tsfile
*/
private Map<String, Long> endTimeMap;
@@ -96,7 +91,8 @@ public class TsFileResourceV2 {
}
public void serialize() throws IOException {
- try (OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file + RESOURCE_SUFFIX))){
+ try (OutputStream outputStream = new BufferedOutputStream(
+ new FileOutputStream(file + RESOURCE_SUFFIX))) {
ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream);
for (Entry<String, Long> entry : this.startTimeMap.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
@@ -111,7 +107,8 @@ public class TsFileResourceV2 {
}
public void deSerialize() throws IOException {
- try (InputStream inputStream = new BufferedInputStream(new FileInputStream(file + RESOURCE_SUFFIX))) {
+ try (InputStream inputStream = new BufferedInputStream(
+ new FileInputStream(file + RESOURCE_SUFFIX))) {
int size = ReadWriteIOUtils.readInt(inputStream);
Map<String, Long> startTimes = new HashMap<>();
for (int i = 0; i < size; i++) {
@@ -132,10 +129,16 @@ public class TsFileResourceV2 {
}
public void updateStartTime(String device, long time) {
- startTimeMap.putIfAbsent(device, time);
- long startTime = startTimeMap.get(device);
- if (time < startTimeMap.get(device)) {
- startTimeMap.put(device, startTime);
+ long startTime = startTimeMap.getOrDefault(device, Long.MAX_VALUE);
+ if (time < startTime) {
+ startTimeMap.put(device, time);
+ }
+ }
+
+ public void updateEndTime(String device, long time) {
+ long endTime = endTimeMap.getOrDefault(device, Long.MIN_VALUE);
+ if (time > endTime) {
+ endTimeMap.put(device, time);
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 81c3cf0..c01c33c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -116,7 +116,7 @@ public class UnsealedTsFileProcessorV2 {
* @param insertPlan physical plan of insertion
* @return succeed or fail
*/
- public boolean insert(InsertPlan insertPlan) {
+ public boolean insert(InsertPlan insertPlan, boolean sequence) {
long start1 = System.currentTimeMillis();
if (workMemTable == null) {
@@ -144,13 +144,17 @@ public class UnsealedTsFileProcessorV2 {
}
// update start time of this memtable
tsFileResource.updateStartTime(insertPlan.getDeviceId(), insertPlan.getTime());
+ if (!sequence) {
+ tsFileResource.updateEndTime(insertPlan.getDeviceId(), insertPlan.getTime());
+ }
long start2 = System.currentTimeMillis();
// insert tsRecord to work memtable
workMemTable.insert(insertPlan);
start2 = System.currentTimeMillis() - start2;
if (start2 > 1000) {
- LOGGER.info("UFP {} insert into memtable cost: {}, insertPlan: {}, current data points in memtable: {}",
+ LOGGER.info(
+ "UFP {} insert into memtable cost: {}, insertPlan: {}, current data points in memtable: {}",
storageGroupName, start2, insertPlan, workMemTable.size());
}
@@ -212,8 +216,8 @@ public class UnsealedTsFileProcessorV2 {
}
/**
- * Ensure there must be a flush thread submitted after setCloseMark() is called,
- * therefore the close task will be executed by a flush thread.
+ * Ensure there must be a flush thread submitted after setCloseMark() is called, therefore the
+ * close task will be executed by a flush thread.
*/
public void asyncClose() {
flushQueryLock.writeLock().lock();
@@ -221,7 +225,9 @@ public class UnsealedTsFileProcessorV2 {
try {
IMemTable tmpMemTable = workMemTable == null ? new EmptyMemTable() : workMemTable;
if (!tmpMemTable.isManagedByMemPool()) {
- LOGGER.info("storage group {} add an empty memtable into flushing memtable list when async close", storageGroupName);
+ LOGGER.info(
+ "storage group {} add an empty memtable into flushing memtable list when async close",
+ storageGroupName);
} else {
LOGGER.info("storage group {} async flush a memtable when async close", storageGroupName);
}
@@ -299,7 +305,7 @@ public class UnsealedTsFileProcessorV2 {
writer.makeMetadataVisible();
flushingMemTables.remove(memTable);
LOGGER.info("flush finished, remove a memtable from flushing list, "
- + "flushing memtable list size: {}", flushingMemTables.size());
+ + "flushing memtable list size: {}", flushingMemTables.size());
} finally {
flushQueryLock.writeLock().unlock();
}
@@ -317,7 +323,8 @@ public class UnsealedTsFileProcessorV2 {
// null memtable only appears when calling asyncClose()
if (memTableToFlush.isManagedByMemPool()) {
- MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, fileSchema, writer, storageGroupName,
+ MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, fileSchema, writer,
+ storageGroupName,
this::releaseFlushedMemTableCallback);
flushTask.flushMemTable();
long start = System.currentTimeMillis();
@@ -397,7 +404,8 @@ public class UnsealedTsFileProcessorV2 {
public void close() throws IOException {
tsFileResource.close();
- MultiFileLogNodeManager.getInstance().deleteNode(storageGroupName + "-" + tsFileResource.getFile().getName());
+ MultiFileLogNodeManager.getInstance()
+ .deleteNode(storageGroupName + "-" + tsFileResource.getFile().getName());
}
public void setManagedByFlushManager(boolean managedByFlushManager) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index 6c6120f..e53c351 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -60,7 +60,7 @@ public class FileNodeProcessorV2Test {
processor.insert(new InsertPlan(record));
processor.asyncForceClose();
}
-
+
processor.syncCloseFileNode();
QueryDataSourceV2 queryDataSource = null;
try {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index 94f5f1b..2331391 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -19,7 +19,8 @@
package org.apache.iotdb.db.engine.filenodeV2;
import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import java.io.File;
import java.io.IOException;
@@ -86,7 +87,7 @@ public class UnsealedTsFileProcessorV2Test {
for (int i = 1; i <= 100; i++) {
TSRecord record = new TSRecord(i, deviceId);
record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
- processor.insert(new InsertPlan(record));
+ processor.insert(new InsertPlan(record), true);
}
// query data in memory
@@ -134,7 +135,7 @@ public class UnsealedTsFileProcessorV2Test {
for (int i = 1; i <= 10; i++) {
TSRecord record = new TSRecord(i, deviceId);
record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
- processor.insert(new InsertPlan(record));
+ processor.insert(new InsertPlan(record), true);
}
processor.asyncFlush();
}
@@ -177,7 +178,7 @@ public class UnsealedTsFileProcessorV2Test {
for (int i = 1; i <= 100; i++) {
TSRecord record = new TSRecord(i, deviceId);
record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
- processor.insert(new InsertPlan(record));
+ processor.insert(new InsertPlan(record), true);
}
// query data in memory