You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/28 13:16:07 UTC
[incubator-iotdb] 01/03: fix query clone TVList bug
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 8d69b5ea79c3b0c5e402f0d7d1b62049620d045e
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jun 28 19:45:04 2019 +0800
fix query clone TVList bug
---
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 25 ++++---
.../db/engine/filenodeV2/TsFileResourceV2.java | 4 ++
.../filenodeV2/UnsealedTsFileProcessorV2.java | 21 +++---
.../iotdb/db/engine/memtable/AbstractMemTable.java | 44 +++++++++----
.../apache/iotdb/db/engine/memtable/IMemTable.java | 2 +
.../db/engine/memtable/MemTableFlushTaskV2.java | 2 -
.../db/engine/memtable/TimeValuePairSorter.java | 2 +-
.../db/engine/memtable/WritableMemChunkV2.java | 76 +++++++++++-----------
.../iotdb/db/utils/datastructure/TVList.java | 4 +-
.../engine/filenodeV2/FileNodeProcessorV2Test.java | 1 +
.../UnseqSeriesReaderByTimestampTest.java | 2 +-
11 files changed, 109 insertions(+), 74 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index a98a140..7615bc0 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -89,12 +89,13 @@ public class FileNodeProcessorV2 {
private String storageGroupName;
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
private final Object closeFileNodeCondition = new Object();
private final ThreadLocal<Long> timerr = new ThreadLocal<>();
+ private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
/**
* Mark whether to close file node
*/
@@ -372,7 +373,7 @@ public class FileNodeProcessorV2 {
// TODO need a read lock, please consider the concurrency with flush manager threads.
public QueryDataSourceV2 query(String deviceId, String measurementId)
throws FileNodeProcessorException {
- lock.readLock().lock();
+ insertLock.readLock().lock();
try {
List<TsFileResourceV2> seqResources = getFileReSourceListForQuery(sequenceFileList,
deviceId, measurementId);
@@ -380,13 +381,13 @@ public class FileNodeProcessorV2 {
deviceId, measurementId);
return new QueryDataSourceV2(new Path(deviceId, measurementId), seqResources, unseqResources);
} finally {
- lock.readLock().unlock();
+ insertLock.readLock().unlock();
}
}
private void writeLock() {
long time = System.currentTimeMillis();
- lock.writeLock().lock();
+ insertLock.writeLock().lock();
time = System.currentTimeMillis() - time;
if (time > 1000) {
LOGGER.info("storage group {} wait for write lock cost: {}", storageGroupName, time, new RuntimeException());
@@ -395,7 +396,7 @@ public class FileNodeProcessorV2 {
}
private void writeUnlock() {
- lock.writeLock().unlock();
+ insertLock.writeLock().unlock();
long time = System.currentTimeMillis() - timerr.get();
if (time > 1000) {
LOGGER.info("storage group {} take lock for {}ms", storageGroupName, time, new RuntimeException());
@@ -418,8 +419,9 @@ public class FileNodeProcessorV2 {
if (!tsFileResource.containsDevice(deviceId)) {
continue;
}
- synchronized (tsFileResource) {
- if (!tsFileResource.getStartTimeMap().isEmpty()) {
+ if (!tsFileResource.getStartTimeMap().isEmpty()) {
+ closeQueryLock.readLock().lock();
+ try {
if (tsFileResource.isClosed()) {
tsfileResourcesForQuery.add(tsFileResource);
} else {
@@ -432,8 +434,12 @@ public class FileNodeProcessorV2 {
throw new FileNodeProcessorException(e);
}
tsfileResourcesForQuery
- .add(new TsFileResourceV2(tsFileResource.getFile(), pair.left, pair.right));
+ .add(new TsFileResourceV2(tsFileResource.getFile(),
+ tsFileResource.getStartTimeMap(),
+ tsFileResource.getEndTimeMap(), pair.left, pair.right));
}
+ } finally {
+ closeQueryLock.readLock().unlock();
}
}
}
@@ -633,10 +639,13 @@ public class FileNodeProcessorV2 {
// TODO please consider concurrency with query and insert method.
public void closeUnsealedTsFileProcessorCallback(
UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
+ closeQueryLock.writeLock().lock();
try {
unsealedTsFileProcessor.close();
} catch (IOException e) {
LOGGER.error("storage group: {} close unsealedTsFileProcessor failed", storageGroupName, e);
+ } finally {
+ closeQueryLock.writeLock().unlock();
}
if (closingSequenceTsFileProcessor.contains(unsealedTsFileProcessor)) {
closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index a60ccf5..e9366d0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -88,9 +88,13 @@ public class TsFileResourceV2 {
}
public TsFileResourceV2(File file,
+ Map<String, Long> startTimeMap,
+ Map<String, Long> endTimeMap,
ReadOnlyMemChunk readOnlyMemChunk,
List<ChunkMetaData> chunkMetaDatas) {
this.file = file;
+ this.startTimeMap = startTimeMap;
+ this.endTimeMap = endTimeMap;
this.chunkMetaDatas = chunkMetaDatas;
this.readOnlyMemChunk = readOnlyMemChunk;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 81c3cf0..b432f3e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -23,11 +23,13 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.memtable.EmptyMemTable;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -298,6 +300,7 @@ public class UnsealedTsFileProcessorV2 {
try {
writer.makeMetadataVisible();
flushingMemTables.remove(memTable);
+ memTable.release();
LOGGER.info("flush finished, remove a memtable from flushing list, "
+ "flushing memtable list size: {}", flushingMemTables.size());
} finally {
@@ -359,14 +362,10 @@ public class UnsealedTsFileProcessorV2 {
writer.endFile(fileSchema);
- flushQueryLock.writeLock().lock();
- try {
// remove this processor from Closing list in FileNodeProcessor, mark the TsFileResource closed, no need writer anymore
- closeUnsealedFileCallback.accept(this);
- writer = null;
- } finally {
- flushQueryLock.writeLock().unlock();
- }
+ closeUnsealedFileCallback.accept(this);
+
+ writer = null;
// delete the restore for this bufferwrite processor
if (LOGGER.isInfoEnabled()) {
@@ -440,12 +439,12 @@ public class UnsealedTsFileProcessorV2 {
if (!flushingMemTable.isManagedByMemPool()) {
continue;
}
- memSeriesLazyMerger
- .addMemSeries(flushingMemTable.query(deviceId, measurementId, dataType, props));
+ ReadOnlyMemChunk memChunk = flushingMemTable.query(deviceId, measurementId, dataType, props);
+ memSeriesLazyMerger.addMemSeries(memChunk);
}
if (workMemTable != null) {
- memSeriesLazyMerger
- .addMemSeries(workMemTable.query(deviceId, measurementId, dataType, props));
+ ReadOnlyMemChunk memChunk = workMemTable.query(deviceId, measurementId, dataType, props);
+ memSeriesLazyMerger.addMemSeries(memChunk);
}
// memSeriesLazyMerger has handled the props,
// so we do not need to handle it again in the following readOnlyMemChunk
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index fe3a5f5..8073704 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -24,13 +24,23 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsBinary;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsBoolean;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsDouble;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsFloat;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsInt;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsLong;
+import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public abstract class AbstractMemTable implements IMemTable {
@@ -143,11 +153,20 @@ public abstract class AbstractMemTable implements IMemTable {
@Override
public ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
Map<String, String> props) {
-
- return new ReadOnlyMemChunk(dataType, getSeriesData(deviceId,
- measurement, dataType), props);
+ TimeValuePairSorter sorter;
+ if (!checkPath(deviceId, measurement)) {
+ sorter = new WritableMemChunk(dataType);
+ } else {
+ long undeletedTime = findUndeletedTime(deviceId, measurement);
+ IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
+ IWritableMemChunk chunkCopy = new WritableMemChunkV2(dataType, memChunk.getTVList().clone());
+ chunkCopy.setTimeOffset(undeletedTime);
+ sorter = chunkCopy;
+ }
+ return new ReadOnlyMemChunk(dataType, sorter, props);
}
+
private long findUndeletedTime(String deviceId, String measurement) {
String path = deviceId + PATH_SEPARATOR + measurement;
long undeletedTime = 0;
@@ -162,16 +181,6 @@ public abstract class AbstractMemTable implements IMemTable {
return undeletedTime + 1;
}
- private TimeValuePairSorter getSeriesData(String deviceId, String measurement, TSDataType dataType) {
- if (!checkPath(deviceId, measurement)) {
- return new WritableMemChunk(dataType);
- }
- long undeletedTime = findUndeletedTime(deviceId, measurement);
- IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
- memChunk.setTimeOffset(undeletedTime);
- return memChunk;
- }
-
@Override
public boolean delete(String deviceId, String measurementId, long timestamp) {
Map<String, IWritableMemChunk> deviceMap = memTableMap.get(deviceId);
@@ -263,4 +272,13 @@ public abstract class AbstractMemTable implements IMemTable {
public TVListAllocator getTVListAllocator() {
return allocator;
}
+
+ @Override
+ public void release() {
+ for (Entry<String, Map<String, IWritableMemChunk>> entry: memTableMap.entrySet()) {
+ for (Entry<String, IWritableMemChunk> subEntry: entry.getValue().entrySet()) {
+ allocator.release(entry.getKey() + IoTDBConstant.PATH_SEPARATOR + subEntry.getKey(), subEntry.getValue().getTVList());
+ }
+ }
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 9f0a453..8408b54 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -102,4 +102,6 @@ public interface IMemTable {
void setTVListAllocator(TVListAllocator allocator);
TVListAllocator getTVListAllocator();
+
+ void release();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index a6f182d..6ef25a7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -143,8 +143,6 @@ public class MemTableFlushTaskV2 {
try {
writeOneSeries(encodingMessage.left, seriesWriter,
encodingMessage.right.getType());
- memTable.getTVListAllocator().release(currDevice + IoTDBConstant.PATH_SEPARATOR
- + encodingMessage.right.getMeasurementId(), encodingMessage.left);
ioTaskQueue.add(seriesWriter);
} catch (IOException e) {
LOGGER.error("Storage group {} memtable {}, encoding task error.", storageGroup,
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java
index ffbb7ec..12e36f7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.utils.TimeValuePair;
public interface TimeValuePairSorter {
/**
- * get the distinct sorted startTime.
+ * get the distinct sorted startTime. Only for query.
*
* @return a List which contains all distinct {@link TimeValuePair}s in ascending order by
* timestamp.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java
index 14a8335..8d34e67 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java
@@ -39,6 +39,7 @@ public class WritableMemChunkV2 implements IWritableMemChunk {
private static final Logger LOGGER = LoggerFactory.getLogger(WritableMemChunkV2.class);
private TSDataType dataType;
private TVList list;
+ private List<TimeValuePair> sortedList;
public WritableMemChunkV2(TSDataType dataType, TVList list) {
this.dataType = dataType;
@@ -129,7 +130,7 @@ public class WritableMemChunkV2 implements IWritableMemChunk {
}
@Override
- public TVList getSortedTVList() {
+ public synchronized TVList getSortedTVList() {
list.sort();
return list;
}
@@ -155,42 +156,43 @@ public class WritableMemChunkV2 implements IWritableMemChunk {
}
@Override
- public List<TimeValuePair> getSortedTimeValuePairList() {
- List<TimeValuePair> result = new ArrayList<>();
- TVList cloneList = list.clone();
- cloneList.sort();
- for (int i = 0; i < cloneList.size(); i++) {
- long time = cloneList.getTime(i);
- if (time < cloneList.getTimeOffset() ||
- (i+1 < cloneList.size() && (time == cloneList.getTime(i+1)))) {
- continue;
- }
-
- switch (dataType) {
- case BOOLEAN:
- result.add(new TimeValuePair(time, new TsBoolean(cloneList.getBoolean(i))));
- break;
- case INT32:
- result.add(new TimeValuePair(time, new TsInt(cloneList.getInt(i))));
- break;
- case INT64:
- result.add(new TimeValuePair(time, new TsLong(cloneList.getLong(i))));
- break;
- case FLOAT:
- result.add(new TimeValuePair(time, new TsFloat(cloneList.getFloat(i))));
- break;
- case DOUBLE:
- result.add(new TimeValuePair(time, new TsDouble(cloneList.getDouble(i))));
- break;
- case TEXT:
- result.add(new TimeValuePair(time, new TsBinary(cloneList.getBinary(i))));
- break;
- default:
- LOGGER.error("don't support data type: {}", dataType);
- break;
- }
- }
- return result;
+ public synchronized List<TimeValuePair> getSortedTimeValuePairList() {
+ if (sortedList != null) {
+ return sortedList;
+ }
+ sortedList = new ArrayList<>();
+ list.sort();
+ for (int i = 0; i < list.size(); i++) {
+ long time = list.getTime(i);
+ if (time < list.getTimeOffset() ||
+ (i + 1 < list.size() && (time == list.getTime(i + 1)))) {
+ continue;
+ }
+ switch (dataType) {
+ case BOOLEAN:
+ sortedList.add(new TimeValuePair(time, new TsBoolean(list.getBoolean(i))));
+ break;
+ case INT32:
+ sortedList.add(new TimeValuePair(time, new TsInt(list.getInt(i))));
+ break;
+ case INT64:
+ sortedList.add(new TimeValuePair(time, new TsLong(list.getLong(i))));
+ break;
+ case FLOAT:
+ sortedList.add(new TimeValuePair(time, new TsFloat(list.getFloat(i))));
+ break;
+ case DOUBLE:
+ sortedList.add(new TimeValuePair(time, new TsDouble(list.getDouble(i))));
+ break;
+ case TEXT:
+ sortedList.add(new TimeValuePair(time, new TsBinary(list.getBinary(i))));
+ break;
+ default:
+ LOGGER.error("don't support data type: {}", dataType);
+ break;
+ }
+ }
+ return this.sortedList;
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 90779ff..35e9fd6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -144,7 +144,6 @@ public abstract class TVList {
public void reset() {
size = 0;
- limit = 0;
timeOffset = -1;
sorted = false;
}
@@ -164,6 +163,9 @@ public abstract class TVList {
}
protected void sort(int lo, int hi) {
+ if (sorted) {
+ return;
+ }
if (lo == hi) {
return;
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index 6c6120f..508a411 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -55,6 +55,7 @@ public class FileNodeProcessorV2Test {
@Test
public void testSequenceSyncClose() {
for (int j = 1; j <= 100; j++) {
+ System.out.println(j);
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/unsequence/UnseqSeriesReaderByTimestampTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/unsequence/UnseqSeriesReaderByTimestampTest.java
index 78dfdbf..822a9a5 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/unsequence/UnseqSeriesReaderByTimestampTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/unsequence/UnseqSeriesReaderByTimestampTest.java
@@ -74,7 +74,7 @@ public class UnseqSeriesReaderByTimestampTest {
TSRecord record = new TSRecord(2, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(100)));
FileNodeManagerV2.getInstance().insert(new InsertPlan(record));
-// FileNodeManagerV2.getInstance().asyncFlushAndSealAllFiles();
+ // FileNodeManagerV2.getInstance().asyncFlushAndSealAllFiles();
// query
List<Path> paths = new ArrayList<>();