You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/06 02:57:22 UTC
[incubator-iotdb] branch refactor_overflow updated: fix tests
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch refactor_overflow
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/refactor_overflow by this push:
new b34cb2a fix tests
b34cb2a is described below
commit b34cb2a7f3cdf0fdac91a79d7790da26a56a905c
Author: 江天 <jt...@163.com>
AuthorDate: Thu Jun 6 10:55:13 2019 +0800
fix tests
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 +-
.../org/apache/iotdb/db/engine/DatabaseEngine.java | 3 +-
.../db/engine/sgmanager/StorageGroupManager.java | 17 +-
.../db/engine/sgmanager/StorageGroupProcessor.java | 196 +++++++--------
.../tsfiledata/RestorableTsFileIOWriter.java | 8 +
.../db/engine/tsfiledata/TsFileProcessor.java | 15 +-
.../exception/BufferWriteProcessorException.java | 41 ----
.../iotdb/db/exception/ErrorDebugException.java | 37 ---
.../db/exception/FileNodeNotExistException.java | 33 ---
.../db/exception/FileNodeProcessorException.java | 44 ----
.../iotdb/db/exception/NotConsistentException.java | 29 ---
.../db/exception/OverflowProcessorException.java | 41 ----
.../exception/OverflowWrongParameterException.java | 42 ----
.../db/exception/ProcessorRuntimException.java | 41 ----
...ption.java => QueryEngineRunningException.java} | 12 +-
.../iotdb/db/exception/TooManyChunksException.java | 45 ----
.../db/exception/UnSupportedFillTypeException.java | 2 +-
.../UnSupportedOverflowOpTypeException.java | 42 ----
.../apache/iotdb/db/monitor/MonitorConstants.java | 2 +-
.../iotdb/db/qp/executor/OverflowQPExecutor.java | 4 +-
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 2 +-
.../iotdb/db/qp/physical/crud/InsertPlan.java | 4 +
.../iotdb/db/query/control/FileReaderManager.java | 1 +
.../db/query/factory/SeriesReaderFactory.java | 55 +++--
.../query/reader/sequence/SealedTsFilesReader.java | 11 +-
.../sequence/SealedTsFilesReaderByTimestamp.java | 20 +-
.../reader/sequence/UnSealedTsFileReader.java | 10 +-
.../iotdb/db/sync/receiver/SyncServiceImpl.java | 6 +-
.../org/apache/iotdb/db/utils/OpenFileNumUtil.java | 2 +-
.../java/org/apache/iotdb/db/utils/QueryUtils.java | 29 ++-
.../writelog/manager/MultiFileLogNodeManager.java | 7 +-
.../db/writelog/replay/ConcreteLogReplayer.java | 2 +-
.../java/org/apache/iotdb/db/engine/PathUtils.java | 2 +-
.../org/apache/iotdb/db/engine/ProcessorTest.java | 34 +--
.../engine/bufferwrite/BufferWriteBenchmark.java | 128 ----------
.../bufferwrite/BufferWriteProcessorNewTest.java | 173 -------------
.../bufferwrite/BufferWriteProcessorTest.java | 258 --------------------
.../bufferwrite/RestorableTsFileIOWriterTest.java | 267 ---------------------
.../filenode/FileNodeProcessorStoreTest.java | 91 -------
.../db/engine/filenode/TsFileResourceTest.java | 98 --------
.../filenodev2/FileNodeManagerBenchmark.java | 125 ----------
.../db/engine/memcontrol/MemControllerTest.java | 6 +-
.../memcontrol/OverflowFileSizeControlTest.java | 143 -----------
.../memcontrol/OverflowMetaSizeControlTest.java | 144 -----------
...rolTest.java => TsFileMetaSizeControlTest.java} | 91 ++-----
...ControlTest.java => TsFileSizeControlTest.java} | 96 +++-----
.../engine/modification/DeletionFileNodeTest.java | 134 ++++++-----
.../db/engine/modification/DeletionQueryTest.java | 161 +++++++------
.../db/engine/overflow/io/OverflowIOTest.java | 65 -----
.../engine/overflow/io/OverflowMemtableTest.java | 100 --------
.../overflow/io/OverflowProcessorBenchmark.java | 125 ----------
.../engine/overflow/io/OverflowProcessorTest.java | 211 ----------------
.../engine/overflow/io/OverflowResourceTest.java | 92 -------
.../db/engine/overflow/io/OverflowTestUtils.java | 77 ------
.../overflow/metadata/OFFileMetadataTest.java | 90 -------
.../metadata/OFRowGroupListMetadataTest.java | 93 -------
.../metadata/OFSeriesListMetadataTest.java | 88 -------
.../overflow/metadata/OverflowTestHelper.java | 84 -------
.../db/engine/overflow/metadata/OverflowUtils.java | 138 -----------
.../engine/overflowdata/OverflowProcessorTest.java | 41 ++--
.../db/engine/tsfiledata/TsFileProcessorTest.java | 223 +++++++++--------
.../integration/QueryDataFromUnclosedTsFileIT.java | 44 ++--
.../org/apache/iotdb/db/monitor/MonitorTest.java | 32 +--
.../iotdb/db/qp/plan/LogicalPlanSmallTest.java | 26 +-
.../apache/iotdb/db/qp/plan/PhysicalPlanTest.java | 21 +-
.../apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 40 +--
.../apache/iotdb/db/utils/EnvironmentUtils.java | 32 +--
.../iotdb/db/writelog/IoTDBLogFileSizeTest.java | 42 +---
.../apache/iotdb/db/writelog/PerformanceTest.java | 44 ++--
.../org/apache/iotdb/db/writelog/RecoverTest.java | 83 ++-----
.../iotdb/db/writelog/WriteLogNodeManagerTest.java | 13 +-
.../apache/iotdb/db/writelog/WriteLogNodeTest.java | 43 +---
.../iotdb/tsfile/file/metadata/ChunkMetaData.java | 3 +
.../read/reader/series/FileSeriesReader.java | 2 +
74 files changed, 750 insertions(+), 3870 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index cb4b799..8ab170c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -88,9 +88,9 @@ public class IoTDBConfig {
private String[] overflowDataDirs = {DEFAULT_OVERFLOW_DIR};
/**
- * Data directory of fileNode data.
+ * Data directory of StorageGroup data.
*/
- private String fileNodeDir = "info";
+ private String storageGroupDir = "info";
/**
* Data directory of bufferWrite data.
@@ -332,7 +332,7 @@ public class IoTDBConfig {
if (getSysDir().length() > 0 && !getSysDir().endsWith(File.separator)) {
setSysDir(getSysDir() + File.separatorChar);
}
- setFileNodeDir(getSysDir() + getFileNodeDir());
+ setStorageGroupDir(getSysDir() + getStorageGroupDir());
setMetadataDir(getSysDir() + getMetadataDir());
// update the paths of subdirectories in the walDir
@@ -514,12 +514,12 @@ public class IoTDBConfig {
this.overflowDataDirs = overflowDataDirs;
}
- public String getFileNodeDir() {
- return fileNodeDir;
+ public String getStorageGroupDir() {
+ return storageGroupDir;
}
- public void setFileNodeDir(String fileNodeDir) {
- this.fileNodeDir = fileNodeDir;
+ public void setStorageGroupDir(String storageGroupDir) {
+ this.storageGroupDir = storageGroupDir;
}
public void setBufferWriteDirs(String[] bufferWriteDirs) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/DatabaseEngine.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/DatabaseEngine.java
index 71f278d..95a27b0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/DatabaseEngine.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/DatabaseEngine.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.sgmanager.TsFileResource;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.engine.datasource.QueryDataSource;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
+import org.apache.iotdb.db.monitor.IStatistic;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.service.IService;
@@ -37,7 +38,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
/**
* DatabaseEngine is an abstraction of IoTDB storage-level interfaces.
*/
-public interface DatabaseEngine extends IService {
+public interface DatabaseEngine extends IService, IStatistic {
/**
* This function is just for unit test.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java
index b7260ea..9322728 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java
@@ -37,20 +37,18 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.DatabaseEngine;
-import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.engine.datasource.QueryDataSource;
+import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.monitor.IStatistic;
import org.apache.iotdb.db.monitor.MonitorConstants;
import org.apache.iotdb.db.monitor.MonitorConstants.StorageGroupManagerStatConstants;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -66,7 +64,7 @@ import org.slf4j.LoggerFactory;
* StorageGroupManager provides top-level interfaces to access IoTDB storage engine. It decides
* which StorageGroup(s) to access in order to complete a query.
*/
-public class StorageGroupManager implements IStatistic, IService, DatabaseEngine {
+public class StorageGroupManager implements DatabaseEngine {
private static final Logger LOGGER = LoggerFactory.getLogger(StorageGroupManager.class);
private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig();
@@ -237,15 +235,15 @@ public class StorageGroupManager implements IStatistic, IService, DatabaseEngine
throw new StorageGroupManagerException("Restoring all StorageGroups failed.", e);
}
for (String storageGroupName : storageGroupNames) {
- StorageGroupProcessor StorageGroupProcessor = null;
+ StorageGroupProcessor storageGroupProcessor = null;
try {
- StorageGroupProcessor = getProcessor(storageGroupName, true);
+ storageGroupProcessor = getProcessor(storageGroupName, true);
} catch (StorageGroupManagerException e) {
throw new StorageGroupManagerException(String.format("Restoring StorageGroup %s failed.",
storageGroupName), e);
} finally {
- if (StorageGroupProcessor != null) {
- StorageGroupProcessor.writeUnlock();
+ if (storageGroupProcessor != null) {
+ storageGroupProcessor.writeUnlock();
}
}
}
@@ -307,8 +305,7 @@ public class StorageGroupManager implements IStatistic, IService, DatabaseEngine
@Override
public void update(String deviceId, String measurementId, long startTime, long endTime,
- TSDataType type, String v)
- throws StorageGroupManagerException {
+ TSDataType type, String v) {
throw new UnsupportedOperationException("Method unimplemented");
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
index 586955b..b5a704e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
@@ -59,7 +59,6 @@ import org.apache.iotdb.db.engine.datasource.SeriesDataSource;
import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.FileNodeProcessorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
@@ -86,8 +85,6 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
import org.apache.iotdb.tsfile.write.schema.FileSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -102,7 +99,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
private static final String OLD_FILE_RECORD = "MERGE_OLD_FILES";
private final String statStorageGroupName;
- private String fileNodeDir;
+ private String storageGroupDir;
private TsFileProcessor tsFileProcessor;
private OverflowProcessor overflowProcessor;
@@ -135,12 +132,11 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
private ModificationFile mergingModification;
private TsFileIOWriter mergeFileWriter = null;
- private String mergeOutputPath = null;
- private String mergeBaseDir = null;
- private String mergeFileName = null;
private boolean mergeIsChunkGroupHasData = false;
private long mergeStartPos;
+ private Map<String, AtomicLong> statisticMap;
+
/**
* Construct processor using StorageGroup name
*/
@@ -151,14 +147,15 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
fileSchema = constructFileSchema(processorName);
- File systemFolder = new File(IoTDBDescriptor.getInstance().getConfig().getFileNodeDir(),
+ File systemFolder = new File(IoTDBDescriptor.getInstance().getConfig().getStorageGroupDir(),
processorName);
if (!systemFolder.exists()) {
systemFolder.mkdirs();
- LOGGER.info("The directory of the filenode processor {} doesn't exist. Create new directory {}",
+ LOGGER.info("The directory of the storage group processor {} doesn't exist. Create new "
+ + "directory {}",
getProcessorName(), systemFolder.getAbsolutePath());
}
- fileNodeDir = systemFolder.getAbsolutePath();
+ storageGroupDir = systemFolder.getAbsolutePath();
try {
cleanLastMerge();
} catch (IOException e) {
@@ -181,15 +178,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
+ processorName.replaceAll("\\.", "_");
// RegisterStatService
- if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
- String statStorageDeltaName =
- MonitorConstants.STAT_STORAGE_GROUP_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR
- + MonitorConstants.FILE_NODE_PATH + MonitorConstants.MONITOR_PATH_SEPARATOR
- + processorName.replaceAll("\\.", "_");
- StatMonitor statMonitor = StatMonitor.getInstance();
- registerStatMetadata();
- statMonitor.registerStatistics(statStorageDeltaName, this);
- }
+ registerStatMetadata();
}
public OperationResult insert(InsertPlan insertPlan) throws TsFileProcessorException {
@@ -287,14 +276,28 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
@Override
public void registerStatMetadata() {
- Map<String, String> hashMap = new HashMap<>();
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+ String statStorageDeltaName =
+ MonitorConstants.STAT_STORAGE_GROUP_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR
+ + MonitorConstants.FILE_NODE_PATH + MonitorConstants.MONITOR_PATH_SEPARATOR
+ + processorName.replaceAll("\\.", "_");
+ StatMonitor statMonitor = StatMonitor.getInstance();
+ statMonitor.registerStatistics(statStorageDeltaName, this);
+ Map<String, String> typeMap = new HashMap<>();
+ for (StorageGroupProcessorStatConstants statConstant :
+ StorageGroupProcessorStatConstants.values()) {
+ typeMap
+ .put(statStorageGroupName + MonitorConstants.MONITOR_PATH_SEPARATOR + statConstant.name(),
+ MonitorConstants.DATA_TYPE_INT64);
+ }
+ StatMonitor.getInstance().registerStatStorageGroup(typeMap);
+ }
+
+ statisticMap = new HashMap<>();
for (StorageGroupProcessorStatConstants statConstant :
StorageGroupProcessorStatConstants.values()) {
- hashMap
- .put(statStorageGroupName + MonitorConstants.MONITOR_PATH_SEPARATOR + statConstant.name(),
- MonitorConstants.DATA_TYPE_INT64);
+ statisticMap.put(statConstant.name(), new AtomicLong(0));
}
- StatMonitor.getInstance().registerStatStorageGroup(hashMap);
}
@Override
@@ -310,7 +313,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
@Override
public Map<String, AtomicLong> getStatParamsHashMap() {
- return null;
+ return statisticMap;
}
private FileSchema constructFileSchema(String processorName) {
@@ -416,7 +419,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
/**
* Assign a token to a new query and put it into the newSet so that we know which
*/
- public int addMultiPassCount() {
+ int addMultiPassCount() {
LOGGER.debug("Add MultiPassCount: read lock newMultiPassCount.");
Integer token = newMultiPassCount.incrementAndGet();
@@ -428,7 +431,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
/**
* decrease multiple pass count.
*/
- public void decreaseMultiPassCount(int token) {
+ void decreaseMultiPassCount(int token) {
if (newMultiPassTokenSet.contains(token)) {
newMultiPassTokenSet.remove(token);
LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, count:{}", token,
@@ -454,7 +457,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
* @param srcFilePath the path of the source file
*/
@SuppressWarnings("ResultOfMethodCallIgnored")
- public void appendFile(TsFileResource destFileResource, String srcFilePath)
+ void appendFile(TsFileResource destFileResource, String srcFilePath)
throws TsFileProcessorException {
try {
if (!destFileResource.getFile().getParentFile().exists()) {
@@ -464,11 +467,11 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
File srcFile = new File(srcFilePath);
File destFile = destFileResource.getFile();
if (!srcFile.exists()) {
- throw new FileNodeProcessorException(
+ throw new StorageGroupProcessorException(
String.format("The source file %s does not exist.", srcFilePath));
}
if (destFile.exists()) {
- throw new FileNodeProcessorException(
+ throw new StorageGroupProcessorException(
String.format("The destination file %s already exists.",
destFileResource.getFile().getAbsolutePath()));
}
@@ -478,9 +481,10 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
}
// append the new tsfile
this.tsFileProcessor.appendFile(destFileResource);
- // reconstruct the inverted index of the newFileNodes
+ // reconstruct the inverted index of the new storage groups
} catch (Exception e) {
- LOGGER.error("Failed to append the tsfile {} to filenode processor {}.", destFileResource,
+ LOGGER.error("Failed to append the tsfile {} to storage group processor {}.",
+ destFileResource,
getProcessorName());
throw new TsFileProcessorException(e);
}
@@ -491,7 +495,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
*
* @param appendFile the appended tsfile information
*/
- public List<String> getOverlapFiles(TsFileResource appendFile, String uuid)
+ List<String> getOverlapFiles(TsFileResource appendFile, String uuid)
throws TsFileProcessorException {
return tsFileProcessor.getOverlapFiles(appendFile, uuid);
}
@@ -504,7 +508,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
* @param compressor
* @param props
*/
- public void addTimeSeries(String measurementId, TSDataType dataType, TSEncoding encoding,
+ void addTimeSeries(String measurementId, TSDataType dataType, TSEncoding encoding,
CompressionType compressor, Map<String, String> props) {
fileSchema.registerMeasurement(new MeasurementSchema(measurementId, dataType, encoding,
compressor, props));
@@ -513,7 +517,8 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
/**
* submit the merge task to the <code>MergePool</code>.
*
- * @return null -can't submit the merge task, because this filenode is not overflowed or it is
+ * @return null -can't submit the merge task, because this storage group is not overflowed or
+ * it is
* merging now. Future - submit the merge task successfully.
*/
Future submitToMerge() {
@@ -522,7 +527,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
long thisMergeTime = System.currentTimeMillis();
long mergeTimeInterval = thisMergeTime - lastMergeTime;
LOGGER.info(
- "The filenode {} last merge time is {}, this merge time is {}, "
+ "The storage group {} last merge time is {}, this merge time is {}, "
+ "merge time interval is {}s",
processorName, ofInstant(Instant.ofEpochMilli(lastMergeTime),
zoneId), ofInstant(Instant.ofEpochMilli(thisMergeTime),
@@ -546,12 +551,12 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
if (!isMerging) {
Callable<Exception> mergeThread;
mergeThread = new MergeTask(this);
- LOGGER.info("Submit the merge task, the merge filenode is {}", processorName);
+ LOGGER.info("Submit the merge task, the merge storage group is {}", processorName);
return MergeManager.getInstance().submit(mergeThread);
} else {
LOGGER.warn(
"Skip this merge task submission, because last merge task is not over yet, "
- + "the merge filenode processor is {}",
+ + "the merge storage group is {}",
processorName);
}
return null;
@@ -746,7 +751,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
}
private File getMergeFileRecord() {
- return new File(fileNodeDir, OLD_FILE_RECORD);
+ return new File(storageGroupDir, OLD_FILE_RECORD);
}
private void recordOldFiles(TsFileResource newFile,
@@ -781,11 +786,11 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
// min data time of all files
long minimumTime = resolveMinimumTime(seqGroup, unseqGroup);
- mergeBaseDir = Directories.getInstance().getNextFolderForTsfile();
- mergeFileName = minimumTime
+ String mergeBaseDir = Directories.getInstance().getNextFolderForTsfile();
+ String mergeFileName = minimumTime
+ EngineConstants.TSFILE_NAME_SEPARATOR + System.currentTimeMillis()
+ MERGE_TEMP_SUFFIX;
- mergeOutputPath = constructOutputFilePath(mergeBaseDir, getProcessorName(),
+ String mergeOutputPath = constructOutputFilePath(mergeBaseDir, getProcessorName(),
mergeFileName);
mergeFileName = getProcessorName() + File.separatorChar + mergeFileName;
try {
@@ -801,54 +806,9 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
Map<String, Long> startTimeMap = new HashMap<>();
Map<String, Long> endTimeMap = new HashMap<>();
- QueryContext context = new QueryContext();
- IReader seriesReader = null;
- ChunkGroupFooter footer;
- int numOfChunks = 0;
- for (Entry<String, List<Path>> entry : pathMap.entrySet()) {
- // start writing a device
- String deviceId = entry.getKey();
- List<Path> paths = entry.getValue();
-
- for (Path path : paths) {
- // start writing a series
- try {
- // sequence
- SeriesDataSource seqSource = new SeriesDataSource(path, seqGroup, null, null);
- SeriesDataSource unseqSource = new SeriesDataSource(path, unseqGroup, null, null);
-
- seriesReader = SeriesReaderFactory.getInstance().createSeriesReaderForMerge(seqSource,
- unseqSource, context);
- TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath());
- // end writing a series
- numOfChunks += queryAndWriteSeries(seriesReader, path, null, dataType,
- startTimeMap, endTimeMap);
- } catch (PathErrorException | IOException e) {
- LOGGER.error("{}: error occurred when merging {}", processorName, path.getFullPath(), e);
- } finally {
- if (seriesReader != null) {
- try {
- seriesReader.close();
- } catch (IOException e) {
- LOGGER.error("{}: cannot close series reader of {} when merge", processorName,
- path.getFullPath(), e);
- }
- }
- }
- }
-
- // end writing a device
- try {
- if (mergeIsChunkGroupHasData) {
- // end the new rowGroupMetadata
- long size = mergeFileWriter.getPos() - mergeStartPos;
- footer = new ChunkGroupFooter(deviceId, size, numOfChunks);
- mergeFileWriter.endChunkGroup(footer, 0);
- }
- } catch (IOException e) {
- LOGGER.error("{}: Cannot end ChunkGroup of {} when merge", processorName, deviceId, e);
- }
+ for (Entry<String, List<Path>> entry : pathMap.entrySet()) {
+ mergeDevice(entry.getKey(), entry.getValue(), seqGroup, unseqGroup, startTimeMap, endTimeMap);
}
try {
@@ -868,12 +828,56 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
newResource.setModFile(mergingModification);
mergingModification = null;
mergeFileWriter = null;
- mergeOutputPath = null;
- mergeBaseDir = null;
- mergeFileName = null;
return newResource;
}
+ private void mergeDevice(String deviceId, List<Path> paths, List<TsFileResource> seqGroup,
+ List<TsFileResource> unseqGroup, Map<String, Long> startTimeMap,
+ Map<String, Long> endTimeMap) {
+ QueryContext context = new QueryContext();
+ IReader seriesReader = null;
+ ChunkGroupFooter footer;
+ int numOfChunks = 0;
+ // start writing a device
+ for (Path path : paths) {
+ // start writing a series
+ try {
+ // sequence
+ SeriesDataSource seqSource = new SeriesDataSource(path, seqGroup, null, null);
+ SeriesDataSource unseqSource = new SeriesDataSource(path, unseqGroup, null, null);
+
+ seriesReader = SeriesReaderFactory.getInstance().createSeriesReaderForMerge(seqSource,
+ unseqSource, context);
+ TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath());
+ // end writing a series
+ numOfChunks += queryAndWriteSeries(seriesReader, path, null, dataType,
+ startTimeMap, endTimeMap);
+ } catch (PathErrorException | IOException e) {
+ LOGGER.error("{}: error occurred when merging {}", processorName, path.getFullPath(), e);
+ } finally {
+ if (seriesReader != null) {
+ try {
+ seriesReader.close();
+ } catch (IOException e) {
+ LOGGER.error("{}: cannot close series reader of {} when merge", processorName,
+ path.getFullPath(), e);
+ }
+ }
+ }
+ }
+ // end writing a device
+ try {
+ if (mergeIsChunkGroupHasData) {
+ // end the new rowGroupMetadata
+ long size = mergeFileWriter.getPos() - mergeStartPos;
+ footer = new ChunkGroupFooter(deviceId, size, numOfChunks);
+ mergeFileWriter.endChunkGroup(footer, 0);
+ }
+ } catch (IOException e) {
+ LOGGER.error("{}: Cannot end ChunkGroup of {} when merge", processorName, deviceId, e);
+ }
+ }
+
private Map<String, List<Path>> resolveMergePaths(List<TsFileResource> seqGroup, List<TsFileResource> unseqGroup)
throws PathErrorException {
Map<String, List<Path>> paths = new HashMap<>();
@@ -941,7 +945,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
int numOfChunk = 0;
if (!seriesReader.hasNext()) {
LOGGER.debug(
- "The time-series {} has no data with the filter {} in the filenode processor {}",
+ "The time-series {} has no data with the filter {} in the storage group {}",
path, seriesFilter, getProcessorName());
} else {
numOfChunk ++;
@@ -1026,9 +1030,13 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
// if system crushes during last merge, there may be undeleted old files
// these files will be deleted in this method
File mergeRecord = getMergeFileRecord();
+ if (!mergeRecord.exists()) {
+ return;
+ }
+
List<String> filePaths = new ArrayList<>();
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(mergeRecord))) {
- bufferedReader.lines().forEach(line -> filePaths.add(line));
+ bufferedReader.lines().forEach(filePaths::add);
}
// the last file should be the merged new file
String lastFilePath = filePaths.get(filePaths.size() - 1);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/RestorableTsFileIOWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/RestorableTsFileIOWriter.java
index 7fa9190..bcf15ba 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/RestorableTsFileIOWriter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/RestorableTsFileIOWriter.java
@@ -323,4 +323,12 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
return out;
}
+ public long getMetaSize() {
+ long size = 0;
+ for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
+ size += chunkGroupMetaData.getChunkMetaDataList().size() *
+ ChunkMetaData.estimatedObjectByteSize;
+ }
+ return size;
+ }
}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
index b08f869..47f9a16 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
@@ -122,7 +122,7 @@ public class TsFileProcessor extends Processor {
private List<TsFileResource> tsFileResources;
// device -> datafiles
- private Map<String, List<TsFileResource>> inverseIndexOfResource;
+ private Map<String, List<TsFileResource>> inverseIndexOfResource = new HashMap<>();
// device -> min timestamp in current data file
private Map<String, Long> minWrittenTimeForEachDeviceInCurrentFile;
@@ -555,7 +555,7 @@ public class TsFileProcessor extends Processor {
result = true;
} catch (Exception e) {
LOGGER.error(
- "The TsFile Processor {} failed to flush, when calling the afterFlushAction(filenodeFlushAction).",
+ "The TsFile Processor {} failed to flush.",
processorName, e);
result = false;
} finally {
@@ -794,6 +794,9 @@ public class TsFileProcessor extends Processor {
@Override
public void close() throws TsFileProcessorException {
+ if (isClosed) {
+ return;
+ }
closeCurrentFile();
try {
if (currentResource != null) {
@@ -959,6 +962,10 @@ public class TsFileProcessor extends Processor {
return fileSize;
}
+ public long currentFileSize() {
+ return currentResource != null ? currentResource.getFile().length() : 0;
+ }
+
public List<TsFileResource> getTsFileResources() {
return tsFileResources;
}
@@ -988,4 +995,8 @@ public class TsFileProcessor extends Processor {
}
tsFileResources = newFiles;
}
+
+ public long getMetaSize() {
+ return writer.getMetaSize();
+ }
}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/BufferWriteProcessorException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/BufferWriteProcessorException.java
deleted file mode 100644
index bf6a349..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/BufferWriteProcessorException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.exception;
-
-public class BufferWriteProcessorException extends ProcessorException {
-
- private static final long serialVersionUID = 6817880163296469038L;
-
- public BufferWriteProcessorException() {
- super();
- }
-
- public BufferWriteProcessorException(Exception pathExcp) {
- super(pathExcp.getMessage());
- }
-
- public BufferWriteProcessorException(String msg) {
- super(msg);
- }
-
- public BufferWriteProcessorException(Throwable throwable) {
- super(throwable.getMessage());
- }
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/ErrorDebugException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/ErrorDebugException.java
deleted file mode 100644
index 9dff25f..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/ErrorDebugException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.exception;
-
-/**
- * ErrorDebugException.
- *
- * @author kangrong
- */
-public class ErrorDebugException extends RuntimeException {
-
- private static final long serialVersionUID = -1123099620556170447L;
-
- public ErrorDebugException(String msg) {
- super(msg);
- }
-
- public ErrorDebugException(Exception e) {
- super(e);
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeNotExistException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeNotExistException.java
deleted file mode 100644
index 6463f47..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeNotExistException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.exception;
-
-/**
- * Throw this exception when the file node processor is not exists.
- *
- * @author kangrong
- */
-public class FileNodeNotExistException extends RuntimeException {
-
- private static final long serialVersionUID = -4334041411884083545L;
-
- public FileNodeNotExistException(String msg) {
- super(msg);
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
deleted file mode 100644
index d3cf362..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.exception;
-
-public class FileNodeProcessorException extends ProcessorException {
-
- private static final long serialVersionUID = 7373978140952977661L;
-
- public FileNodeProcessorException() {
- super();
- }
-
- public FileNodeProcessorException(PathErrorException pathExcp) {
- super(pathExcp.getMessage());
- }
-
- public FileNodeProcessorException(String msg) {
- super(msg);
- }
-
- public FileNodeProcessorException(Throwable throwable) {
- super(throwable.getMessage());
- }
-
- public FileNodeProcessorException(String msg, Throwable e) {
- super(msg, e);
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/NotConsistentException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/NotConsistentException.java
deleted file mode 100644
index c570a0b..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/NotConsistentException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.exception;
-
-public class NotConsistentException extends Exception {
-
- private static final long serialVersionUID = 5176858298038267828L;
-
- public NotConsistentException(String msg) {
- super(msg);
- }
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/OverflowProcessorException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/OverflowProcessorException.java
deleted file mode 100644
index 21ac261..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/OverflowProcessorException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.exception;
-
-public class OverflowProcessorException extends ProcessorException {
-
- private static final long serialVersionUID = -2784502746101925819L;
-
- public OverflowProcessorException() {
- super();
- }
-
- public OverflowProcessorException(PathErrorException pathExcp) {
-
- }
-
- public OverflowProcessorException(String msg) {
- super(msg);
- }
-
- public OverflowProcessorException(Throwable throwable) {
- super(throwable.getMessage());
- }
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/OverflowWrongParameterException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/OverflowWrongParameterException.java
deleted file mode 100644
index e0bd2a5..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/OverflowWrongParameterException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.exception;
-
-/**
- * Used for IntervalTree pass wrong parameters. <br> e.g. TSDataType inconsistent; e.g. start time
- * is greater than end time.
- *
- * @author CGF
- */
-public class OverflowWrongParameterException extends DeltaEngineRunningException {
-
- private static final long serialVersionUID = 5386506095896639099L;
-
- public OverflowWrongParameterException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public OverflowWrongParameterException(String message) {
- super(message);
- }
-
- public OverflowWrongParameterException(Throwable cause) {
- super(cause);
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/ProcessorRuntimException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/ProcessorRuntimException.java
deleted file mode 100644
index 6f4fe91..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/ProcessorRuntimException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.exception;
-
-/**
- * @author liukun
- *
- */
-public class ProcessorRuntimException extends RuntimeException {
-
- private static final long serialVersionUID = -5543549255867713835L;
-
- public ProcessorRuntimException() {
- super();
- }
-
- public ProcessorRuntimException(String message) {
- super(message);
- }
-
- public ProcessorRuntimException(Throwable cause) {
- super(cause);
- }
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/DeltaEngineRunningException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/QueryEngineRunningException.java
similarity index 77%
rename from iotdb/src/main/java/org/apache/iotdb/db/exception/DeltaEngineRunningException.java
rename to iotdb/src/main/java/org/apache/iotdb/db/exception/QueryEngineRunningException.java
index acae81c..76959f8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/DeltaEngineRunningException.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/QueryEngineRunningException.java
@@ -19,28 +19,28 @@
package org.apache.iotdb.db.exception;
/**
- * This Exception is the parent class for all delta engine runtime exceptions.<br> This Exception
+ * This Exception is the parent class for all query engine runtime exceptions.<br> This Exception
* extends super class {@link java.lang.RuntimeException}
*
* @author CGF
*/
-public abstract class DeltaEngineRunningException extends RuntimeException {
+public abstract class QueryEngineRunningException extends RuntimeException {
private static final long serialVersionUID = 7537799061005397794L;
- public DeltaEngineRunningException() {
+ public QueryEngineRunningException() {
super();
}
- public DeltaEngineRunningException(String message, Throwable cause) {
+ public QueryEngineRunningException(String message, Throwable cause) {
super(message, cause);
}
- public DeltaEngineRunningException(String message) {
+ public QueryEngineRunningException(String message) {
super(message);
}
- public DeltaEngineRunningException(Throwable cause) {
+ public QueryEngineRunningException(Throwable cause) {
super(cause);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/TooManyChunksException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/TooManyChunksException.java
deleted file mode 100644
index 1e21c89..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/TooManyChunksException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.exception;
-
-import java.io.File;
-import java.util.Arrays;
-
-/**
- * once a TooManyChunksException is thrown, either enlarge the memory resource for queries, or begin
- * to merge the files in this exception.
- */
-public class TooManyChunksException extends Exception {
-
- File[] files;
-
- public TooManyChunksException(File[] files, String device, String measurment,
- long numberOfWays) {
- super(String.format(
- "Files (%s) has too many Chunks for %s' %s, while query is only allocated %d Chunk spaces. Pls merge the OF file frist before you use it",
- Arrays.toString(files), device, measurment, numberOfWays));
- this.files = files;
- }
-
- public File[] getFiles() {
- return files;
- }
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/UnSupportedFillTypeException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/UnSupportedFillTypeException.java
index cf78eba..a46cffc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/UnSupportedFillTypeException.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/UnSupportedFillTypeException.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.exception;
-public class UnSupportedFillTypeException extends DeltaEngineRunningException {
+public class UnSupportedFillTypeException extends QueryEngineRunningException {
public UnSupportedFillTypeException(String message, Throwable cause) {
super(message, cause);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/UnSupportedOverflowOpTypeException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/UnSupportedOverflowOpTypeException.java
deleted file mode 100644
index 3253a9a..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/UnSupportedOverflowOpTypeException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.exception;
-
-/**
- * OverflowOpType could only be INSERT, UPDATE, DELETE. The other situations would throw this
- * exception.
- *
- * @author CGF
- */
-public class UnSupportedOverflowOpTypeException extends DeltaEngineRunningException {
-
- private static final long serialVersionUID = -3834482432038784174L;
-
- public UnSupportedOverflowOpTypeException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public UnSupportedOverflowOpTypeException(String message) {
- super(message);
- }
-
- public UnSupportedOverflowOpTypeException(Throwable cause) {
- super(cause);
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java
index ec97a0d..e57e403 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java
@@ -99,7 +99,7 @@ public class MonitorConstants {
OVERFLOW(Monitor.INSTANCE.getBaseDirectory() + File.separatorChar + "overflow"),
SETTLED(Monitor.INSTANCE.getBaseDirectory() + File.separatorChar + "settled"),
WAL(new File(config.getWalFolder()).getAbsolutePath()),
- INFO(new File(config.getFileNodeDir()).getAbsolutePath()),
+ INFO(new File(config.getStorageGroupDir()).getAbsolutePath()),
SCHEMA(new File(config.getMetadataDir()).getAbsolutePath());
public String getPath() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index b30da6c..f3ae094 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -116,7 +116,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
return flag;
case INSERT:
InsertPlan insert = (InsertPlan) plan;
- multiInsert(insert);
+ insert(insert);
return true;
case CREATE_ROLE:
case DELETE_ROLE:
@@ -241,7 +241,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
}
@Override
- public void multiInsert(InsertPlan plan)
+ public void insert(InsertPlan plan)
throws ProcessorException {
try {
String deviceId = plan.getDeviceId();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index d139f65..0428801 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -177,7 +177,7 @@ public abstract class QueryProcessExecutor {
*
* @param plan the InsertPlan
*/
- public abstract void multiInsert(InsertPlan plan) throws ProcessorException;
+ public abstract void insert(InsertPlan plan) throws ProcessorException;
public abstract List<String> getAllPaths(String originPath) throws PathErrorException;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 3e0c13a..dde5a9d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -37,6 +37,10 @@ public class InsertPlan extends PhysicalPlan {
// 1 : BufferWrite Insert 2 : Overflow Insert
private int insertType;
+ public InsertPlan(String deviceId, long insertTime, String measurement, String value) {
+ this(deviceId, insertTime, new String[]{measurement}, new String[]{value});
+ }
+
public InsertPlan(String deviceId, long insertTime, String[] measurementList,
String[] insertValues) {
super(false, Operator.OperatorType.INSERT);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index 461ff0b..f2a3782 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -143,6 +143,7 @@ public class FileReaderManager implements IService {
return tsFileReader;
}
+
return readerMap.get(filePath);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 013c0bf..7bf37c7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.factory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.db.engine.datasource.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.sgmanager.TsFileResource;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -35,6 +36,7 @@ import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.reader.AllDataReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.IReader;
+import org.apache.iotdb.db.query.reader.mem.MemChunkReader;
import org.apache.iotdb.db.query.reader.mem.MemChunkReaderByTimestamp;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
@@ -48,6 +50,7 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerier;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
@@ -80,32 +83,56 @@ public class SeriesReaderFactory {
int priorityValue = 1;
for (TsFileResource overflowFile : overflowSeriesDataSource.getSealedFiles()) {
-
TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
.get(overflowFile.getFilePath(), true);
ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileSequenceReader);
List<ChunkMetaData> metaDataList = readFileChunkMetadata(tsFileSequenceReader, context,
overflowSeriesDataSource.getSeriesPath(), overflowFile.getModFile());
- for (ChunkMetaData chunkMetaData : metaDataList) {
- Chunk chunk = chunkLoader.getChunk(chunkMetaData);
- ChunkReader chunkReader;
- if (filter == null) {
- chunkReader = new ChunkReaderWithoutFilter(chunk);
- } else {
- chunkReader = new ChunkReaderWithFilter(chunk, filter);
- }
+ priorityValue = createReadersForChunk(metaDataList, chunkLoader, filter, unSeqMergeReader,
+ tsFileSequenceReader, priorityValue);
+ }
+ if (overflowSeriesDataSource.hasUnsealedFile()) {
+ TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
+ .get(overflowSeriesDataSource.getUnsealedFile().getFilePath(), true);
+ ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileSequenceReader);
+ List<ChunkMetaData> metaDataList =
+ overflowSeriesDataSource.getUnsealedFile().getChunkMetaDataList();
- unSeqMergeReader
- .addReaderWithPriority(new EngineChunkReader(chunkReader, tsFileSequenceReader),
- priorityValue);
- priorityValue++;
- }
+ priorityValue = createReadersForChunk(metaDataList, chunkLoader, filter, unSeqMergeReader,
+ tsFileSequenceReader, priorityValue);
+ }
+ if (overflowSeriesDataSource.hasRawSeriesChunk()) {
+ ReadOnlyMemChunk chunk = overflowSeriesDataSource.getReadableChunk();
+ unSeqMergeReader
+ .addReaderWithPriority(new MemChunkReader(chunk, filter),
+ priorityValue);
}
return unSeqMergeReader;
}
+ private int createReadersForChunk(List<ChunkMetaData> metaDataList, ChunkLoader chunkLoader,
+ Filter filter, PriorityMergeReader unSeqMergeReader,
+ TsFileSequenceReader tsFileSequenceReader, int priorityValue)
+ throws IOException {
+ for (ChunkMetaData chunkMetaData : metaDataList) {
+ Chunk chunk = chunkLoader.getChunk(chunkMetaData);
+ ChunkReader chunkReader;
+ if (filter == null) {
+ chunkReader = new ChunkReaderWithoutFilter(chunk);
+ } else {
+ chunkReader = new ChunkReaderWithFilter(chunk, filter);
+ }
+
+ unSeqMergeReader
+ .addReaderWithPriority(new EngineChunkReader(chunkReader, tsFileSequenceReader),
+ priorityValue);
+ priorityValue++;
+ }
+ return priorityValue;
+ }
+
// TODO createUnSeqMergeReaderByTime a method with filter
/**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
index e38d673..3e3768c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.iotdb.db.engine.sgmanager.TsFileResource;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.db.query.reader.IBatchReader;
import org.apache.iotdb.db.utils.QueryUtils;
@@ -123,9 +124,6 @@ public class SealedTsFilesReader implements IBatchReader, IAggregateReader {
private void initSingleTsFileReader(TsFileResource tsfile, QueryContext context)
throws IOException {
- if (seriesReader != null) {
- seriesReader.close();
- }
seriesReader = QueryUtils.createTsFileReader(tsfile, context, seriesPath, isReverse, filter);
}
@@ -135,10 +133,8 @@ public class SealedTsFilesReader implements IBatchReader, IAggregateReader {
}
@Override
- public void close() throws IOException {
- if (seriesReader != null) {
- seriesReader.close();
- }
+ public void close() {
+ // do not close lower reader because other thread may be using it
}
@Override
@@ -150,4 +146,5 @@ public class SealedTsFilesReader implements IBatchReader, IAggregateReader {
public void skipPageData() {
seriesReader.skipPageData();
}
+
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReaderByTimestamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReaderByTimestamp.java
index 5778c50..136d8f2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReaderByTimestamp.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReaderByTimestamp.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.query.reader.sequence;
+import static org.apache.iotdb.db.utils.QueryUtils.queryChunks;
+
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.engine.sgmanager.TsFileResource;
@@ -34,6 +36,7 @@ import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
import org.apache.iotdb.tsfile.read.reader.series.SeriesReaderByTimestamp;
+import org.apache.iotdb.tsfile.utils.Pair;
public class SealedTsFilesReaderByTimestamp implements EngineReaderByTimeStamp {
@@ -116,21 +119,8 @@ public class SealedTsFilesReaderByTimestamp implements EngineReaderByTimeStamp {
throws IOException {
// to avoid too many opened files
- TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
- .get(fileNode.getFilePath(), true);
-
- MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
- List<ChunkMetaData> metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
-
- List<Modification> pathModifications = context.getPathModifications(fileNode.getModFile(),
- seriesPath.getFullPath());
- if (!pathModifications.isEmpty()) {
- QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
- }
- ChunkLoader chunkLoader = new ChunkLoaderImpl(tsFileReader);
-
- seriesReader = new SeriesReaderByTimestamp(chunkLoader, metaDataList);
-
+ Pair<ChunkLoader, List<ChunkMetaData>> chunkRst = queryChunks(fileNode, seriesPath, context);
+ seriesReader = new SeriesReaderByTimestamp(chunkRst.left, chunkRst.right);
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java
index 8948488..6ee6ac5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java
@@ -44,14 +44,14 @@ public class UnSealedTsFileReader implements IBatchReader, IAggregateReader {
private FileSeriesReader unSealedReader;
/**
- * Construct funtion for UnSealedTsFileReader.
+ * Construct function for UnSealedTsFileReader.
*
* @param unsealedTsFile -param to initial
* @param filter -filter
* @param isReverse true-traverse chunks from behind forward; false-traverse chunks from front to
* back;
*/
- public UnSealedTsFileReader(UnsealedTsFile unsealedTsFile, Filter filter, boolean isReverse)
+ UnSealedTsFileReader(UnsealedTsFile unsealedTsFile, Filter filter, boolean isReverse)
throws IOException {
TsFileSequenceReader unClosedTsFileReader = FileReaderManager.getInstance()
@@ -78,10 +78,8 @@ public class UnSealedTsFileReader implements IBatchReader, IAggregateReader {
}
@Override
- public void close() throws IOException {
- if (unSealedReader != null) {
- unSealedReader.close();
- }
+ public void close() {
+ // do not close lower reader because it is shared
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
index a7292bf..552c353 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
@@ -556,7 +556,7 @@ public class SyncServiceImpl implements SyncService.Iface {
}
}
insertExecutor
- .multiInsert(new InsertPlan(deviceId, record.getTimestamp(),
+ .insert(new InsertPlan(deviceId, record.getTimestamp(),
measurementList.toArray(new String[]{}), insertValues.toArray(new String[]{})));
}
}
@@ -631,13 +631,13 @@ public class SyncServiceImpl implements SyncService.Iface {
/** If there has no overlap data with the timeseries, inserting all data in the sync file **/
if (originDataPoints.isEmpty()) {
for (InsertPlan insertPlan : newDataPoints) {
- insertExecutor.multiInsert(insertPlan);
+ insertExecutor.insert(insertPlan);
}
} else {
/** Compare every data to get valid data **/
for (InsertPlan insertPlan : newDataPoints) {
if (!originDataPoints.contains(insertPlan)) {
- insertExecutor.multiInsert(insertPlan);
+ insertExecutor.insert(insertPlan);
}
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/OpenFileNumUtil.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/OpenFileNumUtil.java
index 8709b0a..98c5e10 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/OpenFileNumUtil.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/OpenFileNumUtil.java
@@ -260,7 +260,7 @@ public class OpenFileNumUtil {
OVERFLOW_OPEN_FILE_NUM(directories.getAllOverflowFileFolders()),
WAL_OPEN_FILE_NUM(Collections.singletonList(config.getWalFolder())),
METADATA_OPEN_FILE_NUM(Collections.singletonList(config.getMetadataDir())),
- DIGEST_OPEN_FILE_NUM(Collections.singletonList(config.getFileNodeDir())),
+ DIGEST_OPEN_FILE_NUM(Collections.singletonList(config.getStorageGroupDir())),
SOCKET_OPEN_FILE_NUM(null);
// path is a list of directory corresponding to the OpenFileNumStatistics enum element,
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
index 8b2d66f..9a9f174 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithFilter;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithoutFilter;
+import org.apache.iotdb.tsfile.utils.Pair;
public class QueryUtils {
@@ -95,6 +96,22 @@ public class QueryUtils {
FileSeriesReader seriesReader;
// to avoid too many opened files
+ Pair<ChunkLoader, List<ChunkMetaData>> chunkRst = queryChunks(tsfile, path, context);
+
+ if (isReverse) {
+ Collections.reverse(chunkRst.right);
+ }
+
+ if (filter == null) {
+ seriesReader = new FileSeriesReaderWithoutFilter(chunkRst.left, chunkRst.right);
+ } else {
+ seriesReader = new FileSeriesReaderWithFilter(chunkRst.left, chunkRst.right, filter);
+ }
+ return seriesReader;
+ }
+
+ public static Pair<ChunkLoader, List<ChunkMetaData>> queryChunks(TsFileResource tsfile,
+ Path path, QueryContext context) throws IOException {
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
.get(tsfile.getFilePath(), true);
@@ -108,16 +125,6 @@ public class QueryUtils {
}
ChunkLoader chunkLoader = new ChunkLoaderImpl(tsFileReader);
-
- if (isReverse) {
- Collections.reverse(metaDataList);
- }
-
- if (filter == null) {
- seriesReader = new FileSeriesReaderWithoutFilter(chunkLoader, metaDataList);
- } else {
- seriesReader = new FileSeriesReaderWithFilter(chunkLoader, metaDataList, filter);
- }
- return seriesReader;
+ return new Pair<>(chunkLoader, metaDataList);
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
index ec8527a..fd1fa8e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
@@ -147,8 +147,8 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
@Override
public void close() {
- if (!isActivated(syncThread) && !isActivated(forceThread)) {
- logger.error("MultiFileLogNodeManager has not yet started");
+ if (!isActivated(syncThread) && !isActivated(forceThread) && nodeMap.isEmpty()) {
+ logger.warn("MultiFileLogNodeManager has not yet started and there is no node to close");
return;
}
logger.info("LogNodeManager starts closing..");
@@ -233,9 +233,6 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
@Override
public void stop() {
- if (!config.isEnableWal()) {
- return;
- }
close();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
index 9ea6ed3..8eb19ef 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
@@ -53,7 +53,7 @@ public class ConcreteLogReplayer implements LogReplayer {
}
} catch (Exception e) {
throw new ProcessorException(
- String.format("Cannot replay log %s, because %s", plan.toString(), e.getMessage()));
+ String.format("Cannot replay log %s", plan.toString()), e);
}
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/PathUtils.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/PathUtils.java
index dcf9b52..209e8e3 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/PathUtils.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/PathUtils.java
@@ -51,7 +51,7 @@ public class PathUtils {
}
public static File getFileNodeDir(String nameSpacePath) {
- String filenodeDir = config.getFileNodeDir();
+ String filenodeDir = config.getStorageGroupDir();
if (filenodeDir.length() > 0
&& filenodeDir.charAt(filenodeDir.length() - 1) != File.separatorChar) {
filenodeDir = filenodeDir + File.separatorChar;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/ProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/ProcessorTest.java
index 75fa7d1..e2dfcfb 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/ProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/ProcessorTest.java
@@ -20,10 +20,10 @@ package org.apache.iotdb.db.engine;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
-import java.io.IOException;
import java.util.concurrent.Future;
-import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.ImmediateFuture;
import org.junit.After;
@@ -36,9 +36,9 @@ import org.junit.Test;
*/
public class ProcessorTest {
- TestLRUProcessor processor1;
- TestLRUProcessor processor2;
- TestLRUProcessor processor3;
+ private TestLRUProcessor processor1;
+ private TestLRUProcessor processor2;
+ private TestLRUProcessor processor3;
@Before
public void setUp() throws Exception {
@@ -55,7 +55,7 @@ public class ProcessorTest {
@Test
public void testEquals() {
assertEquals(processor1, processor3);
- assertFalse(processor1.equals(processor2));
+ assertNotEquals(processor1, processor2);
}
@Test
@@ -66,13 +66,13 @@ public class ProcessorTest {
Thread.sleep(100);
- assertEquals(false, processor1.tryReadLock());
- assertEquals(false, processor1.tryLock(true));
+ assertFalse(processor1.tryReadLock());
+ assertFalse(processor1.tryLock(true));
Thread.sleep(2000);
- assertEquals(true, processor1.tryLock(true));
- assertEquals(true, processor1.tryLock(false));
+ assertTrue(processor1.tryLock(true));
+ assertTrue(processor1.tryLock(false));
processor1.readUnlock();
processor1.writeUnlock();
@@ -81,19 +81,19 @@ public class ProcessorTest {
thread2.start();
Thread.sleep(100);
- assertEquals(false, processor1.tryWriteLock());
- assertEquals(true, processor1.tryReadLock());
+ assertFalse(processor1.tryWriteLock());
+ assertTrue(processor1.tryReadLock());
Thread.sleep(1500);
- assertEquals(false, processor1.tryWriteLock());
+ assertFalse(processor1.tryWriteLock());
processor1.readUnlock();
- assertEquals(true, processor1.tryWriteLock());
+ assertTrue(processor1.tryWriteLock());
processor1.writeUnlock();
}
class TestLRUProcessor extends Processor {
- public TestLRUProcessor(String nameSpacePath) {
+ TestLRUProcessor(String nameSpacePath) {
super(nameSpacePath);
}
@@ -103,12 +103,12 @@ public class ProcessorTest {
}
@Override
- public void close() throws ProcessorException {
+ public void close() {
}
@Override
- public Future<Boolean> flush() throws IOException {
+ public Future<Boolean> flush() {
return new ImmediateFuture<>(true);
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java
deleted file mode 100644
index 906bef9..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.bufferwrite;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.io.FileUtils;
-import org.apache.iotdb.db.engine.EngineConstants;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-/**
- * BufferWrite insert Benchmark. This class is used to bench Bufferwrite module and gets its
- * performance.
- */
-public class BufferWriteBenchmark {
-
- private static int numOfDevice = 100;
- private static int numOfMeasurement = 100;
- private static int numOfPoint = 1000;
-
- private static String[] deviceIds = new String[numOfDevice];
- private static String[] measurementIds = new String[numOfMeasurement];
- private static FileSchema fileSchema = new FileSchema();
- private static TSDataType tsDataType = TSDataType.INT64;
-
- static {
- for (int i = 0; i < numOfDevice; i++) {
- deviceIds[i] = String.valueOf("d" + i);
- }
- }
-
- static {
- for (int i = 0; i < numOfMeasurement; i++) {
- measurementIds[i] = String.valueOf("m" + i);
- MeasurementSchema measurementDescriptor = new MeasurementSchema("m" + i, tsDataType,
- TSEncoding.PLAIN);
- assert measurementDescriptor.getCompressor() != null;
- fileSchema.registerMeasurement(measurementDescriptor);
-
- }
- }
-
- private static void before() throws IOException {
- FileUtils.deleteDirectory(new File("BufferBenchmark"));
- }
-
- private static void after() throws IOException {
- FileUtils.deleteDirectory(new File("BufferBenchmark"));
- }
-
- public static void main(String[] args) throws BufferWriteProcessorException, IOException {
- before();
- Map<String, Action> parameters = new HashMap<>();
- parameters.put(EngineConstants.BUFFERWRITE_FLUSH_ACTION, new Action() {
- @Override
- public void act() throws ActionException {
- System.out.println(EngineConstants.BUFFERWRITE_FLUSH_ACTION);
- }
- });
- parameters.put(EngineConstants.BUFFERWRITE_CLOSE_ACTION, new Action() {
- @Override
- public void act() throws ActionException {
- System.out.println(EngineConstants.BUFFERWRITE_CLOSE_ACTION);
- }
- });
- parameters.put(EngineConstants.FILENODE_PROCESSOR_FLUSH_ACTION, new Action() {
- @Override
- public void act() throws ActionException {
- System.out.println(EngineConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
- }
- });
-
- BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor("BufferBenchmark",
- "bench", "benchFile",
- parameters, SysTimeVersionController.INSTANCE, fileSchema);
-
- long startTime = System.currentTimeMillis();
- for (int i = 0; i < numOfPoint; i++) {
- for (int j = 0; j < numOfDevice; j++) {
- TSRecord tsRecord = getRecord(deviceIds[j]);
- bufferWriteProcessor.write(tsRecord);
- }
- }
- long endTime = System.currentTimeMillis();
- bufferWriteProcessor.close();
- System.out.println(String.format(
- "Num of time series: %d, " + "Num of points for each time series: %d, "
- + "The total time: %d ms. ",
- numOfMeasurement * numOfDevice, numOfPoint, endTime - startTime));
-
- after();
- }
-
- private static TSRecord getRecord(String deviceId) {
- long time = System.nanoTime();
- long value = System.nanoTime();
- TSRecord tsRecord = new TSRecord(time, deviceId);
- for (String measurement : measurementIds) {
- tsRecord.addTuple(new LongDataPoint(measurement, value));
- }
- return tsRecord;
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
deleted file mode 100644
index bec0052..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.bufferwrite;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.EngineConstants;
-import org.apache.iotdb.db.engine.MetadataManagerHelper;
-import org.apache.iotdb.db.engine.datasource.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.utils.FileSchemaUtils;
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BufferWriteProcessorNewTest {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessorNewTest.class);
- Action bfflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
-
- }
- };
-
- Action bfcloseaction = new Action() {
-
- @Override
- public void act() throws ActionException {
- }
- };
-
- Action fnflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
-
- }
- };
- Map<String, Action> parameters = new HashMap<>();
- private String processorName = "root.vehicle.d0";
- private String measurementId = "s0";
- private TSDataType dataType = TSDataType.INT32;
- private Map<String, String> props = Collections.emptyMap();
- private BufferWriteProcessor bufferwrite;
- private String filename = "tsfile";
-
- @Before
- public void setUp() throws Exception {
- parameters.put(EngineConstants.BUFFERWRITE_FLUSH_ACTION, bfflushaction);
- parameters.put(EngineConstants.BUFFERWRITE_CLOSE_ACTION, bfcloseaction);
- parameters.put(EngineConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
- MetadataManagerHelper.initMetadata();
- EnvironmentUtils.envSetUp();
- }
-
- @After
- public void tearDown() throws Exception {
- bufferwrite.close();
- EnvironmentUtils.cleanEnv();
- }
-
- @Test
- public void testWriteAndFlush()
- throws BufferWriteProcessorException, WriteProcessException, IOException, InterruptedException {
- bufferwrite = new BufferWriteProcessor(Directories.getInstance().getTsFolderForTest(),
- processorName, filename,
- parameters, SysTimeVersionController.INSTANCE,
- FileSchemaUtils.constructFileSchema(processorName));
- assertEquals(processorName + File.separator + filename, bufferwrite.getFileRelativePath());
- assertTrue(bufferwrite.isNewProcessor());
- bufferwrite.setNewProcessor(false);
- assertFalse(bufferwrite.isNewProcessor());
- Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferwrite
- .queryBufferWriteData(processorName,
- measurementId, dataType, props);
- ReadOnlyMemChunk left = pair.left;
- List<ChunkMetaData> right = pair.right;
- assertTrue(left.isEmpty());
- assertEquals(0, right.size());
- for (int i = 1; i <= 100; i++) {
- bufferwrite.write(processorName, measurementId, i, dataType, String.valueOf(i));
- }
- // query data in memory
- pair = bufferwrite.queryBufferWriteData(processorName, measurementId, dataType, props);
- left = pair.left;
- right = pair.right;
- assertFalse(left.isEmpty());
- int num = 1;
- Iterator<TimeValuePair> iterator = left.getIterator();
- for (; num <= 100; num++) {
- iterator.hasNext();
- TimeValuePair timeValuePair = iterator.next();
- assertEquals(num, timeValuePair.getTimestamp());
- assertEquals(num, timeValuePair.getValue().getInt());
- }
- assertFalse(bufferwrite.isFlush());
- long lastFlushTime = bufferwrite.getLastFlushTime();
- // flush asynchronously
- bufferwrite.flush();
- assertTrue(bufferwrite.getLastFlushTime() != lastFlushTime);
- assertTrue(bufferwrite.canBeClosed());
- // waiting for the end of flush.
- try {
- bufferwrite.getFlushFuture().get(10, TimeUnit.SECONDS);
- } catch (Exception e) {
- //because UT uses a mock flush operation, 10 seconds should be enough.
- LOGGER.error(e.getMessage(), e);
- Assert.fail("mock flush spends more than 10 seconds... "
- + "Please modify the value or change a better test environment");
- }
- pair = bufferwrite.queryBufferWriteData(processorName, measurementId, dataType, props);
- left = pair.left;
- right = pair.right;
- assertTrue(left.isEmpty());
- assertEquals(1, right.size());
- assertEquals(measurementId, right.get(0).getMeasurementUid());
- assertEquals(dataType, right.get(0).getTsDataType());
-
- // test recovery
- BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor(
- Directories.getInstance().getTsFolderForTest(), processorName, filename, parameters,
- SysTimeVersionController.INSTANCE,
- FileSchemaUtils.constructFileSchema(processorName));
- pair = bufferWriteProcessor.queryBufferWriteData(processorName, measurementId, dataType, props);
- left = pair.left;
- right = pair.right;
- assertTrue(left.isEmpty());
- assertEquals(1, right.size());
- assertEquals(measurementId, right.get(0).getMeasurementUid());
- assertEquals(dataType, right.get(0).getTsDataType());
- bufferWriteProcessor.close();
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
deleted file mode 100644
index b44555d..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.bufferwrite;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.EngineConstants;
-import org.apache.iotdb.db.engine.MetadataManagerHelper;
-import org.apache.iotdb.db.engine.PathUtils;
-import org.apache.iotdb.db.engine.datasource.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.utils.FileSchemaUtils;
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BufferWriteProcessorTest {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessorTest.class);
-
- Action bfflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
-
- }
- };
-
- Action bfcloseaction = new Action() {
-
- @Override
- public void act() throws ActionException {
- }
- };
-
- Action fnflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
-
- }
- };
-
- private int groupSizeInByte;
- private TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
- private Map<String, Action> parameters = new HashMap<>();
- private BufferWriteProcessor bufferwrite;
- private Directories directories = Directories.getInstance();
- private String deviceId = "root.vehicle.d0";
- private String measurementId = "s0";
- private TSDataType dataType = TSDataType.INT32;
-
- private String insertPath = "insertPath";
-
- @Before
- public void setUp() throws Exception {
- parameters.put(EngineConstants.BUFFERWRITE_FLUSH_ACTION, bfflushaction);
- parameters.put(EngineConstants.BUFFERWRITE_CLOSE_ACTION, bfcloseaction);
- parameters.put(EngineConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
- // origin value
- groupSizeInByte = TsFileConf.groupSizeInByte;
- // new value
- TsFileConf.groupSizeInByte = 1024;
- // init metadata
- MetadataManagerHelper.initMetadata();
- EnvironmentUtils.envSetUp();
- }
-
- @After
- public void tearDown() throws Exception {
- // recovery value
- TsFileConf.groupSizeInByte = groupSizeInByte;
- // clean environment
- EnvironmentUtils.cleanEnv();
- EnvironmentUtils.cleanDir(insertPath);
- }
-
- @Test
- public void testWriteAndAbnormalRecover()
- throws WriteProcessException, InterruptedException, IOException, ProcessorException {
- bufferwrite = new BufferWriteProcessor(directories.getTsFolderForTest(), deviceId, insertPath,
- parameters, SysTimeVersionController.INSTANCE,
- FileSchemaUtils.constructFileSchema(deviceId));
- for (int i = 1; i < 100; i++) {
- bufferwrite.write(deviceId, measurementId, i, dataType, String.valueOf(i));
- }
- // waiting for the end of flush
- TimeUnit.SECONDS.sleep(2);
- File dataFile = PathUtils.getBufferWriteDir(deviceId);
- // check file
- String restoreFilePath = insertPath + ".restore";
- File restoreFile = new File(dataFile, restoreFilePath);
- assertTrue(restoreFile.exists());
- File insertFile = new File(dataFile, insertPath);
- long insertFileLength = insertFile.length();
- FileOutputStream fileOutputStream = new FileOutputStream(insertFile.getPath(), true);
- fileOutputStream.write(new byte[20]);
- fileOutputStream.close();
- assertEquals(insertFileLength + 20, insertFile.length());
- // copy restore file
- File file = new File("temp");
- restoreFile.renameTo(file);
- bufferwrite.close();
- file.renameTo(restoreFile);
- BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor(
- directories.getTsFolderForTest(), deviceId,
- insertPath, parameters, SysTimeVersionController.INSTANCE,
- FileSchemaUtils.constructFileSchema(deviceId));
- assertTrue(insertFile.exists());
- assertEquals(insertFileLength, insertFile.length());
- Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferWriteProcessor
- .queryBufferWriteData(deviceId, measurementId, dataType, Collections.emptyMap());
- assertTrue(pair.left.isEmpty());
- assertEquals(1, pair.right.size());
- ChunkMetaData chunkMetaData = pair.right.get(0);
- assertEquals(measurementId, chunkMetaData.getMeasurementUid());
- assertEquals(dataType, chunkMetaData.getTsDataType());
- bufferWriteProcessor.close();
- assertFalse(restoreFile.exists());
- }
-
- @Test
- public void testWriteAndNormalRecover()
- throws WriteProcessException, ProcessorException, InterruptedException {
- bufferwrite = new BufferWriteProcessor(directories.getTsFolderForTest(), deviceId, insertPath,
- parameters, SysTimeVersionController.INSTANCE,
- FileSchemaUtils.constructFileSchema(deviceId));
- for (int i = 1; i < 100; i++) {
- bufferwrite.write(deviceId, measurementId, i, dataType, String.valueOf(i));
- }
- // waiting for the end of flush
- TimeUnit.SECONDS.sleep(2);
- File dataFile = PathUtils.getBufferWriteDir(deviceId);
- // check file
- String restoreFilePath = insertPath + ".restore";
- File restoreFile = new File(dataFile, restoreFilePath);
- assertTrue(restoreFile.exists());
- BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor(
- directories.getTsFolderForTest(), deviceId,
- insertPath, parameters, SysTimeVersionController.INSTANCE,
- FileSchemaUtils.constructFileSchema(deviceId));
- Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferWriteProcessor
- .queryBufferWriteData(deviceId, measurementId, dataType, Collections.emptyMap());
- assertTrue(pair.left.isEmpty());
- assertEquals(1, pair.right.size());
- ChunkMetaData chunkMetaData = pair.right.get(0);
- assertEquals(measurementId, chunkMetaData.getMeasurementUid());
- assertEquals(dataType, chunkMetaData.getTsDataType());
- bufferWriteProcessor.close();
- bufferwrite.close();
- assertFalse(restoreFile.exists());
- }
-
- @Test
- public void testWriteAndQuery()
- throws WriteProcessException, InterruptedException, ProcessorException {
- bufferwrite = new BufferWriteProcessor(directories.getTsFolderForTest(), deviceId, insertPath,
- parameters, SysTimeVersionController.INSTANCE,
- FileSchemaUtils.constructFileSchema(deviceId));
- assertFalse(bufferwrite.isFlush());
- assertTrue(bufferwrite.canBeClosed());
- assertEquals(0, bufferwrite.memoryUsage());
- assertEquals(TsFileIOWriter.magicStringBytes.length, bufferwrite.getFileSize());
- assertEquals(0, bufferwrite.getMetaSize());
- long lastFlushTime = bufferwrite.getLastFlushTime();
- for (int i = 1; i <= 85; i++) {
- bufferwrite.write(deviceId, measurementId, i, dataType, String.valueOf(i));
- assertEquals(i * 12, bufferwrite.memoryUsage());
- }
- assertEquals(lastFlushTime, bufferwrite.getLastFlushTime());
- bufferwrite.write(deviceId, measurementId, 86, dataType, String.valueOf(86));
- //assert a flush() is called.
- assertNotEquals(bufferwrite.getLastFlushTime(), lastFlushTime);
- // sleep to the end of flush
- try {
- bufferwrite.getFlushFuture().get(10, TimeUnit.SECONDS);
- } catch (Exception e) {
- //because UT uses a mock flush operation, 10 seconds should be enough.
- LOGGER.error(e.getMessage(), e);
- Assert.fail("mock flush spends more than 10 seconds... "
- + "Please modify the value or change a better test environment");
- }
- assertFalse(bufferwrite.isFlush());
- assertEquals(0, bufferwrite.memoryUsage());
- // query result
- Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferwrite
- .queryBufferWriteData(deviceId, measurementId,
- dataType, Collections.emptyMap());
- assertTrue(pair.left.isEmpty());
- assertEquals(1, pair.right.size());
- ChunkMetaData chunkMetaData = pair.right.get(0);
- assertEquals(measurementId, chunkMetaData.getMeasurementUid());
- assertEquals(dataType, chunkMetaData.getTsDataType());
- for (int i = 87; i <= 100; i++) {
- bufferwrite.write(deviceId, measurementId, i, dataType, String.valueOf(i));
- assertEquals((i - 86) * 12, bufferwrite.memoryUsage());
- }
- pair = bufferwrite
- .queryBufferWriteData(deviceId, measurementId, dataType, Collections.emptyMap());
- ReadOnlyMemChunk rawSeriesChunk = (ReadOnlyMemChunk) pair.left;
- assertFalse(rawSeriesChunk.isEmpty());
- assertEquals(87, rawSeriesChunk.getMinTimestamp());
- Assert.assertEquals(87, rawSeriesChunk.getValueAtMinTime().getInt());
- assertEquals(100, rawSeriesChunk.getMaxTimestamp());
- Assert.assertEquals(100, rawSeriesChunk.getValueAtMaxTime().getInt());
- Iterator<TimeValuePair> iterator = rawSeriesChunk.getIterator();
- for (int i = 87; i <= 100; i++) {
- iterator.hasNext();
- TimeValuePair timeValuePair = iterator.next();
- assertEquals(i, timeValuePair.getTimestamp());
- assertEquals(i, timeValuePair.getValue().getInt());
- }
- bufferwrite.close();
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
deleted file mode 100644
index 3054b05..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.bufferwrite;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
-import org.apache.iotdb.db.engine.memtable.MemTableTestUtils;
-import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
-import org.apache.iotdb.db.engine.tsfiledata.RestorableTsFileIOWriter;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.utils.BytesUtils;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class RestorableTsFileIOWriterTest {
-
- private RestorableTsFileIOWriter writer;
- private String processorName = "processor";
- private String insertPath = "insertfile";
- private String restorePath = insertPath + ".restore";
-
- @Before
- public void setUp() throws Exception {
-
- }
-
- @After
- public void tearDown() throws Exception {
- EnvironmentUtils.cleanDir(insertPath);
- EnvironmentUtils.cleanDir(restorePath);
- }
-
- @Test
- public void testInitResource() throws IOException {
- writer = new RestorableTsFileIOWriter(processorName, insertPath);
-
- Pair<Long, List<ChunkGroupMetaData>> pair = writer.readRestoreInfo();
- assertEquals(true, new File(restorePath).exists());
-
- assertEquals(TsFileIOWriter.magicStringBytes.length, (long) pair.left);
- assertEquals(0, pair.right.size());
- writer.endFile(new FileSchema());
- deleteInsertFile();
- assertEquals(false, new File(restorePath).exists());
- }
-
- @Test
- public void testAbnormalRecover() throws IOException {
- writer = new RestorableTsFileIOWriter(processorName, insertPath);
- File insertFile = new File(insertPath);
- File restoreFile = new File(restorePath);
- FileOutputStream fileOutputStream = new FileOutputStream(insertFile);
- // mkdir
- fileOutputStream.write(new byte[400]);
- fileOutputStream.close();
- assertEquals(true, insertFile.exists());
- assertEquals(true, restoreFile.exists());
- assertEquals(400, insertFile.length());
- writer.endFile(new FileSchema());
-
- FileOutputStream out = new FileOutputStream(new File(restorePath));
- // write tsfile position using byte[8] which is present one long
- writeRestoreFile(out, 2);
- writeRestoreFile(out, 3);
- byte[] lastPositionBytes = BytesUtils.longToBytes(200);
- out.write(lastPositionBytes);
- out.close();
- writer = new RestorableTsFileIOWriter(processorName, insertPath);
-
- assertEquals(true, insertFile.exists());
- assertEquals(200, insertFile.length());
- assertEquals(insertPath, writer.getInsertFilePath());
- assertEquals(restorePath, writer.getRestoreFilePath());
- writer.endFile(new FileSchema());
- deleteInsertFile();
- }
-
- @Test
- public void testRecover() throws IOException {
- File insertFile = new File(insertPath);
- FileOutputStream fileOutputStream = new FileOutputStream(insertFile);
- fileOutputStream.write(new byte[200]);
- fileOutputStream.close();
-
- File restoreFile = new File(insertPath + ".restore");
- FileOutputStream out = new FileOutputStream(new File(restorePath));
- // write tsfile position using byte[8] which is present one long
- writeRestoreFile(out, 2);
- writeRestoreFile(out, 3);
- byte[] lastPositionBytes = BytesUtils.longToBytes(200);
- out.write(lastPositionBytes);
- out.close();
-
- writer = new RestorableTsFileIOWriter(processorName, insertPath);
- // writer.endFile(new FileSchema());
-
- assertEquals(true, insertFile.exists());
- assertEquals(true, restoreFile.exists());
-
- RestorableTsFileIOWriter tempbufferwriteResource = new RestorableTsFileIOWriter(processorName,
- insertPath);
-
- assertEquals(true, insertFile.exists());
- assertEquals(200, insertFile.length());
- assertEquals(insertPath, tempbufferwriteResource.getInsertFilePath());
- assertEquals(restorePath, tempbufferwriteResource.getRestoreFilePath());
-
- tempbufferwriteResource.endFile(new FileSchema());
- writer.endFile(new FileSchema());
- deleteInsertFile();
- }
-
- @Test
- public void testWriteAndRecover() throws IOException {
- writer = new RestorableTsFileIOWriter(processorName, insertPath);
- FileSchema schema = new FileSchema();
- schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE));
- schema.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT32, TSEncoding.RLE));
-
- // TsFileWriter fileWriter = new TsFileWriter();
- PrimitiveMemTable memTable = new PrimitiveMemTable();
- memTable.write("d1", "s1", TSDataType.INT32, 1, "1");
- memTable.write("d1", "s1", TSDataType.INT32, 2, "1");
- memTable.write("d1", "s2", TSDataType.INT32, 1, "1");
- memTable.write("d1", "s2", TSDataType.INT32, 3, "1");
- memTable.write("d2", "s2", TSDataType.INT32, 2, "1");
- memTable.write("d2", "s2", TSDataType.INT32, 4, "1");
- MemTableFlushUtil.flushMemTable(schema, writer, memTable, 0);
- writer.flush();
- writer.appendMetadata();
- writer.getOutput().close();
-
- // recover
- writer = new RestorableTsFileIOWriter(processorName, insertPath);
- writer.endFile(schema);
-
- TsFileSequenceReader reader = new TsFileSequenceReader(insertPath);
- TsFileMetaData metaData = reader.readFileMetadata();
- assertEquals(2, metaData.getDeviceMap().size());
- List<ChunkGroupMetaData> chunkGroups = reader
- .readTsDeviceMetaData(metaData.getDeviceMap().get("d1"))
- .getChunkGroupMetaDataList();
- assertEquals(1, chunkGroups.size());
-
- List<ChunkMetaData> chunks = chunkGroups.get(0).getChunkMetaDataList();
- assertEquals(2, chunks.size());
- // d1.s1
- assertEquals(chunks.get(0).getStartTime(), 1);
- assertEquals(chunks.get(0).getEndTime(), 2);
- assertEquals(chunks.get(0).getNumOfPoints(), 2);
- // d1.s2
- assertEquals(chunks.get(1).getStartTime(), 1);
- assertEquals(chunks.get(1).getEndTime(), 3);
- assertEquals(chunks.get(1).getNumOfPoints(), 2);
-
- chunkGroups = reader.readTsDeviceMetaData(metaData.getDeviceMap().get("d2")).getChunkGroupMetaDataList();
- assertEquals(1, chunkGroups.size());
- chunks = chunkGroups.get(0).getChunkMetaDataList();
- assertEquals(1, chunks.size());
- // da.s2
- assertEquals(chunks.get(0).getStartTime(), 2);
- assertEquals(chunks.get(0).getEndTime(), 4);
- assertEquals(chunks.get(0).getNumOfPoints(), 2);
-
- reader.close();
- }
-
- @Test
- public void testFlushAndGetMetadata() throws IOException {
- writer = new RestorableTsFileIOWriter(processorName, insertPath);
-
- assertEquals(0,
- writer.getMetadatas(MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0,
- MemTableTestUtils.dataType0).size());
-
- IMemTable memTable = new PrimitiveMemTable();
- MemTableTestUtils.produceData(memTable, 10, 100, MemTableTestUtils.deviceId0,
- MemTableTestUtils.measurementId0,
- MemTableTestUtils.dataType0);
-
- MemTableFlushUtil.flushMemTable(MemTableTestUtils.getFileSchema(), writer, memTable, 0);
- writer.flush();
-
- assertEquals(0,
- writer.getMetadatas(MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0,
- MemTableTestUtils.dataType0).size());
- writer.appendMetadata();
- assertEquals(1,
- writer.getMetadatas(MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0,
- MemTableTestUtils.dataType0).size());
- MemTableTestUtils.produceData(memTable, 200, 300, MemTableTestUtils.deviceId0,
- MemTableTestUtils.measurementId0,
- MemTableTestUtils.dataType0);
- writer.appendMetadata();
- assertEquals(1,
- writer.getMetadatas(MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0,
- MemTableTestUtils.dataType0).size());
-
- writer.endFile(MemTableTestUtils.getFileSchema());
- deleteInsertFile();
- }
-
- private void writeRestoreFile(OutputStream out, int metadataNum) throws IOException {
- TsDeviceMetadata tsDeviceMetadata = new TsDeviceMetadata();
- List<ChunkGroupMetaData> appendRowGroupMetaDatas = new ArrayList<>();
- for (int i = 0; i < metadataNum; i++) {
- appendRowGroupMetaDatas.add(new ChunkGroupMetaData("d1", new ArrayList<>(), 0));
- }
- tsDeviceMetadata.setChunkGroupMetadataList(appendRowGroupMetaDatas);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- tsDeviceMetadata.serializeTo(baos);
- // write metadata size using int
- int metadataSize = baos.size();
- out.write(BytesUtils.intToBytes(metadataSize));
- // write metadata
- out.write(baos.toByteArray());
- }
-
- private void deleteInsertFile() {
- try {
- Files.delete(Paths.get(insertPath));
- } catch (IOException e) {
- fail(e.getMessage());
- }
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStoreTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStoreTest.java
deleted file mode 100644
index 891695d..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStoreTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenode;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FileNodeProcessorStoreTest {
-
- private boolean isOverflowed;
- private Map<String, Long> lastUpdateTimeMap;
- private TsFileResource emptyTsFileResource;
- private List<TsFileResource> newFileNodes;
- private int numOfMergeFile;
- private FileNodeProcessorStatus fileNodeProcessorStatus;
-
- private FileNodeProcessorStore fileNodeProcessorStore;
-
- @Before
- public void setUp() throws Exception {
- isOverflowed = true;
- lastUpdateTimeMap = new HashMap<>();
- for (int i = 0; i < 10; i++) {
- lastUpdateTimeMap.put("d" + i, (long) i);
- }
- emptyTsFileResource = TsFileResourceTest.constructTsfileResource();
- newFileNodes = new ArrayList<>();
- for (int i = 0; i < 5; i++) {
- newFileNodes.add(TsFileResourceTest.constructTsfileResource());
- }
- numOfMergeFile = 5;
- fileNodeProcessorStatus = FileNodeProcessorStatus.MERGING_WRITE;
- fileNodeProcessorStore = new FileNodeProcessorStore(isOverflowed, lastUpdateTimeMap,
- emptyTsFileResource, newFileNodes, fileNodeProcessorStatus, numOfMergeFile);
- }
-
- @After
- public void tearDown() throws Exception {
-
- }
-
- @Test
- public void testSerDeialize() throws Exception {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- fileNodeProcessorStore.serialize(outputStream);
- ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
- FileNodeProcessorStore deFileNodeProcessorStore = FileNodeProcessorStore
- .deSerialize(inputStream);
-
- assertEquals(fileNodeProcessorStore.getLastUpdateTimeMap(),
- deFileNodeProcessorStore.getLastUpdateTimeMap());
- assertEquals(fileNodeProcessorStore.getNumOfMergeFile(),
- deFileNodeProcessorStore.getNumOfMergeFile());
- assertEquals(fileNodeProcessorStore.getFileNodeProcessorStatus(),
- deFileNodeProcessorStore.getFileNodeProcessorStatus());
- TsFileResourceTest.assertTsfileRecource(fileNodeProcessorStore.getEmptyTsFileResource(),
- deFileNodeProcessorStore.getEmptyTsFileResource());
- assertEquals(fileNodeProcessorStore.getNewFileNodes().size(),
- deFileNodeProcessorStore.getNewFileNodes().size());
- for (int i = 0; i < fileNodeProcessorStore.getNewFileNodes().size(); i++) {
- TsFileResourceTest.assertTsfileRecource(fileNodeProcessorStore.getNewFileNodes().get(i),
- deFileNodeProcessorStore.getNewFileNodes().get(i));
- }
- }
-
-}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/TsFileResourceTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/TsFileResourceTest.java
deleted file mode 100644
index 172ce67..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/TsFileResourceTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenode;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TsFileResourceTest {
-
-
- private TsFileResource tsFileResource;
-
- public static TsFileResource constructTsfileResource() {
- TsFileResource tsFileResource;
- String relativePath = "data/data/settled/b/relativePath";
- Map<String, Long> startTimes = new HashMap<>();
- Map<String, Long> endTimes = new HashMap<>();
-
- tsFileResource = new TsFileResource(Collections.emptyMap(), Collections.emptyMap(),
- new File(relativePath));
- for (int i = 0; i < 10; i++) {
- startTimes.put("d" + i, (long) i);
- }
- for (int i = 0; i < 10; i++) {
- endTimes.put("d" + i, (long) (i + 10));
- }
- tsFileResource.setStartTimeMap(startTimes);
- tsFileResource.setEndTimeMap(endTimes);
- for (int i = 0; i < 5; i++) {
- tsFileResource.addMergeChanged("d" + i);
- }
- return tsFileResource;
- }
-
- @Before
- public void setUp() throws Exception {
- this.tsFileResource = constructTsfileResource();
- }
-
- @After
- public void tearDown() throws Exception {
-
- }
-
- @Test
- public void testSerDeialize() throws Exception {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream(0);
- tsFileResource.serialize(outputStream);
- ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
- TsFileResource deTsfileResource = TsFileResource.deSerialize(inputStream);
- assertTsfileRecource(tsFileResource, deTsfileResource);
- }
- @Test
- public void testSerdeializeCornerCase() throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream(0);
- tsFileResource.setFile(null);
- tsFileResource.serialize(outputStream);
- ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
- TsFileResource deTsfileResource = TsFileResource.deSerialize(inputStream);
- assertTsfileRecource(tsFileResource,deTsfileResource);
- }
-
- public static void assertTsfileRecource(TsFileResource tsFileResource,
- TsFileResource deTsfileResource) {
- assertEquals(tsFileResource.getBaseDirIndex(), deTsfileResource.getBaseDirIndex());
- assertEquals(tsFileResource.getFile(), deTsfileResource.getFile());
- assertEquals(tsFileResource.getOverflowChangeType(), deTsfileResource.getOverflowChangeType());
- assertEquals(tsFileResource.getStartTimeMap(), deTsfileResource.getStartTimeMap());
- assertEquals(tsFileResource.getEndTimeMap(), deTsfileResource.getEndTimeMap());
- assertEquals(tsFileResource.getMergeChanged(), deTsfileResource.getMergeChanged());
- }
-}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodev2/FileNodeManagerBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodev2/FileNodeManagerBenchmark.java
deleted file mode 100644
index 951523c..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodev2/FileNodeManagerBenchmark.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenodev2;
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.db.engine.filenode.DatabaseEngine;
-import org.apache.iotdb.db.exception.StorageGroupManagerException;
-import org.apache.iotdb.db.exception.MetadataArgsErrorException;
-import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.sync.test.RandomNum;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
-
-/**
- * Bench The filenode manager with mul-thread and get its performance.
- */
-public class DatabaseEngineBenchmark {
-
- private static int numOfWoker = 10;
- private static int numOfDevice = 10;
- private static int numOfMeasurement = 10;
- private static long numOfTotalLine = 10000000;
- private static CountDownLatch latch = new CountDownLatch(numOfWoker);
- private static AtomicLong atomicLong = new AtomicLong();
-
- private static String[] devices = new String[numOfDevice];
- private static String prefix = "root.bench";
- private static String[] measurements = new String[numOfMeasurement];
-
- static {
- for (int i = 0; i < numOfDevice; i++) {
- devices[i] = prefix + "." + "device_" + i;
- }
- }
-
- static {
- for (int i = 0; i < numOfMeasurement; i++) {
- measurements[i] = "measurement_" + i;
- }
- }
-
- private static void prepare() throws MetadataArgsErrorException, PathErrorException, IOException {
- MManager manager = MManager.getInstance();
- manager.setStorageLevelToMTree(prefix);
- for (String device : devices) {
- for (String measurement : measurements) {
- manager.addPathToMTree(device + "." + measurement, TSDataType.INT64.toString(),
- TSEncoding.PLAIN.toString());
- }
- }
- }
-
- private static void tearDown() throws IOException, StorageGroupManagerException {
- EnvironmentUtils.cleanEnv();
- }
-
- public static void main(String[] args)
- throws InterruptedException, IOException, MetadataArgsErrorException,
- PathErrorException, StorageGroupManagerException {
- tearDown();
- prepare();
- long startTime = System.currentTimeMillis();
- for (int i = 0; i < numOfWoker; i++) {
- Woker woker = new Woker();
- woker.start();
- }
- latch.await();
- long endTime = System.currentTimeMillis();
- tearDown();
- System.out.println(String.format("The total time: %d ms", (endTime - startTime)));
- }
-
- private static TSRecord getRecord(String deltaObjectId, long timestamp) {
- TSRecord tsRecord = new TSRecord(timestamp, deltaObjectId);
- for (String measurement : measurements) {
- tsRecord.addTuple(new LongDataPoint(measurement, timestamp));
- }
- return tsRecord;
- }
-
- private static class Woker extends Thread {
-
- @Override
- public void run() {
- try {
- while (true) {
- long seed = atomicLong.addAndGet(1);
- if (seed > numOfTotalLine) {
- break;
- }
- long time = RandomNum.getRandomLong(1, seed);
- String deltaObject = devices[(int) (time % numOfDevice)];
- TSRecord tsRecord = getRecord(deltaObject, time);
- DatabaseEngine.getInstance().insert(tsRecord, true);
- }
- } catch (StorageGroupManagerException e) {
- e.printStackTrace();
- } finally {
- latch.countDown();
- }
- }
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/MemControllerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/MemControllerTest.java
index a9d48fa..4a582de 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/MemControllerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/MemControllerTest.java
@@ -20,19 +20,15 @@ package org.apache.iotdb.db.engine.memcontrol;
import static org.junit.Assert.assertEquals;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.junit.Test;
public class MemControllerTest {
private static long GB = 1024 * 1024 * 1024L;
private static long MB = 1024 * 1024L;
- private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@Test
- public void test() throws BufferWriteProcessorException {
+ public void test() {
BasicMemController memController = BasicMemController.getInstance();
if (memController instanceof RecordMemController) {
testRecordMemController();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java
deleted file mode 100644
index 51dd69b..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.memcontrol;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.MetadataManagerHelper;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.EngineConstants;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
-import org.apache.iotdb.db.exception.OverflowProcessorException;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.utils.FileSchemaUtils;
-import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class OverflowFileSizeControlTest {
-
- private String nameSpacePath = "nsp";
- private Map<String, Action> parameters = null;
- private OverflowProcessor ofprocessor = null;
- private TSFileConfig tsconfig = TSFileDescriptor.getInstance().getConfig();
- private String deviceId = "root.vehicle.d0";
- private String[] measurementIds = {"s0", "s1", "s2", "s3", "s4", "s5"};
- private TSDataType[] dataTypes = {TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT,
- TSDataType.DOUBLE,
- TSDataType.BOOLEAN, TSDataType.TEXT};
-
- private IoTDBConfig dbConfig = IoTDBDescriptor.getInstance().getConfig();
- private long overflowFileSize;
- private int groupSize;
-
- private boolean skip = !false;
-
- private Action overflowflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
- }
- };
-
- private Action filenodeflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
- }
- };
-
- private Action DatabaseEnginebackupaction = new Action() {
-
- @Override
- public void act() throws ActionException {
- }
- };
-
- private Action DatabaseEngineflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
- }
- };
-
- @Before
- public void setUp() throws Exception {
- parameters = new HashMap<>();
- parameters.put(EngineConstants.OVERFLOW_FLUSH_ACTION, overflowflushaction);
- parameters.put(EngineConstants.FILENODE_PROCESSOR_FLUSH_ACTION, filenodeflushaction);
-
- overflowFileSize = dbConfig.getOverflowFileSizeThreshold();
- groupSize = tsconfig.groupSizeInByte;
- dbConfig.setOverflowFileSizeThreshold(10 * 1024 * 1024);
- tsconfig.groupSizeInByte = 1024 * 1024;
-
- MetadataManagerHelper.initMetadata();
- }
-
- @After
- public void tearDown() throws Exception {
- dbConfig.setOverflowFileSizeThreshold(overflowFileSize);
- tsconfig.groupSizeInByte = groupSize;
- EnvironmentUtils.cleanEnv();
- }
-
- @Test
- public void testInsert() throws InterruptedException, IOException, WriteProcessException {
- if (skip) {
- return;
- }
- // insert one point: int
- try {
- ofprocessor = new OverflowProcessor(nameSpacePath, parameters,
- FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE);
- for (int i = 1; i < 1000000; i++) {
- TSRecord record = new TSRecord(i, deviceId);
- record.addTuple(DataPoint.getDataPoint(dataTypes[0], measurementIds[0], String.valueOf(i)));
- if (i % 100000 == 0) {
- System.out.println(i + "," + MemUtils.bytesCntToStr(ofprocessor.getFileSize()));
- }
- }
- // wait to flush
- Thread.sleep(1000);
- ofprocessor.close();
- assertTrue(ofprocessor.getFileSize() < dbConfig.getOverflowFileSizeThreshold());
- fail("Method unimplemented");
- } catch (OverflowProcessorException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java
deleted file mode 100644
index c4b00ba..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.memcontrol;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.MetadataManagerHelper;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.EngineConstants;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
-import org.apache.iotdb.db.exception.OverflowProcessorException;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.utils.FileSchemaUtils;
-import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class OverflowMetaSizeControlTest {
-
- private String nameSpacePath = "nsp";
- private Map<String, Action> parameters = null;
- private OverflowProcessor ofprocessor = null;
- private TSFileConfig tsconfig = TSFileDescriptor.getInstance().getConfig();
- private String deviceId = "root.vehicle.d0";
- private String[] measurementIds = {"s0", "s1", "s2", "s3", "s4", "s5"};
- private TSDataType[] dataTypes = {TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT,
- TSDataType.DOUBLE,
- TSDataType.BOOLEAN, TSDataType.TEXT};
-
- private IoTDBConfig dbConfig = IoTDBDescriptor.getInstance().getConfig();
- private long overflowFileSize;
- private int groupSize;
-
- private boolean skip = !false;
-
- private Action overflowflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
- }
- };
-
- private Action filenodeflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
- }
- };
-
- private Action DatabaseEnginebackupaction = new Action() {
-
- @Override
- public void act() throws ActionException {
- }
- };
-
- private Action DatabaseEngineflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
- }
- };
-
- @Before
- public void setUp() throws Exception {
- parameters = new HashMap<String, Action>();
- parameters.put(EngineConstants.OVERFLOW_FLUSH_ACTION, overflowflushaction);
- parameters.put(EngineConstants.FILENODE_PROCESSOR_FLUSH_ACTION, filenodeflushaction);
-
- overflowFileSize = dbConfig.getOverflowMetaSizeThreshold();
- groupSize = tsconfig.groupSizeInByte;
- dbConfig.setOverflowMetaSizeThreshold(3 * 1024 * 1024);
- tsconfig.groupSizeInByte = 1024 * 1024;
-
- MetadataManagerHelper.initMetadata();
- }
-
- @After
- public void tearDown() throws Exception {
- dbConfig.setOverflowMetaSizeThreshold(overflowFileSize);
- tsconfig.groupSizeInByte = groupSize;
- EnvironmentUtils.cleanEnv();
- }
-
- @Test
- public void testInsert() throws InterruptedException, IOException, WriteProcessException {
- if (skip) {
- return;
- }
- // insert one point: int
- try {
- ofprocessor = new OverflowProcessor(nameSpacePath, parameters,
- FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE);
- for (int i = 1; i < 1000000; i++) {
- TSRecord record = new TSRecord(i, deviceId);
- record.addTuple(DataPoint.getDataPoint(dataTypes[0], measurementIds[0], String.valueOf(i)));
- ofprocessor.insert(record);
- if (i % 100000 == 0) {
- System.out.println(i + "," + MemUtils.bytesCntToStr(ofprocessor.getMetaSize()));
- }
- }
- // wait to flush
- Thread.sleep(1000);
- assertTrue(ofprocessor.getMetaSize() < dbConfig.getOverflowMetaSizeThreshold());
- ofprocessor.close();
- fail("Method unimplemented");
- } catch (OverflowProcessorException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/TsFileMetaSizeControlTest.java
similarity index 55%
rename from iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java
rename to iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/TsFileMetaSizeControlTest.java
index 3385837..28f2d9f 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/TsFileMetaSizeControlTest.java
@@ -18,72 +18,39 @@
*/
package org.apache.iotdb.db.engine.memcontrol;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.PathUtils;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
-import org.apache.iotdb.db.engine.EngineConstants;
+import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.FileSchemaUtils;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-public class BufferwriteMetaSizeControlTest {
+public class TsFileMetaSizeControlTest {
- Action bfflushaction = new Action() {
+ private TsFileProcessor processor = null;
+ private String nsp = "root.vehicle.d0";
+ private String nsp2 = "root.vehicle.d1";
- @Override
- public void act() throws ActionException {
-
- }
- };
-
- Action bfcloseaction = new Action() {
-
- @Override
- public void act() throws ActionException {
- }
- };
-
- Action fnflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
-
- }
- };
-
- BufferWriteProcessor processor = null;
- String nsp = "root.vehicle.d0";
- String nsp2 = "root.vehicle.d1";
-
- private boolean cachePageData = false;
private int groupSizeInByte;
private int pageCheckSizeThreshold;
private int pageSizeInByte;
private int maxStringLength;
private long metaSizeThreshold;
private long memMonitorInterval;
- private TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
private IoTDBConfig dbConfig = IoTDBDescriptor.getInstance().getConfig();
private boolean skip = !false;
@@ -91,17 +58,17 @@ public class BufferwriteMetaSizeControlTest {
@Before
public void setUp() throws Exception {
// origin value
- groupSizeInByte = TsFileConf.groupSizeInByte;
- pageCheckSizeThreshold = TsFileConf.pageCheckSizeThreshold;
- pageSizeInByte = TsFileConf.pageSizeInByte;
- maxStringLength = TsFileConf.maxStringLength;
+ groupSizeInByte = TSFileConfig.groupSizeInByte;
+ pageCheckSizeThreshold = TSFileConfig.pageCheckSizeThreshold;
+ pageSizeInByte = TSFileConfig.pageSizeInByte;
+ maxStringLength = TSFileConfig.maxStringLength;
metaSizeThreshold = dbConfig.getBufferwriteFileSizeThreshold();
memMonitorInterval = dbConfig.getMemMonitorInterval();
// new value
- TsFileConf.groupSizeInByte = 200000;
- TsFileConf.pageCheckSizeThreshold = 3;
- TsFileConf.pageSizeInByte = 10000;
- TsFileConf.maxStringLength = 2;
+ TSFileConfig.groupSizeInByte = 200000;
+ TSFileConfig.pageCheckSizeThreshold = 3;
+ TSFileConfig.pageSizeInByte = 10000;
+ TSFileConfig.maxStringLength = 2;
dbConfig.setBufferwriteMetaSizeThreshold(1024 * 1024);
BasicMemController.getInstance().setCheckInterval(600 * 1000);
// init metadata
@@ -111,10 +78,10 @@ public class BufferwriteMetaSizeControlTest {
@After
public void tearDown() throws Exception {
// recovery value
- TsFileConf.groupSizeInByte = groupSizeInByte;
- TsFileConf.pageCheckSizeThreshold = pageCheckSizeThreshold;
- TsFileConf.pageSizeInByte = pageSizeInByte;
- TsFileConf.maxStringLength = maxStringLength;
+ TSFileConfig.groupSizeInByte = groupSizeInByte;
+ TSFileConfig.pageCheckSizeThreshold = pageCheckSizeThreshold;
+ TSFileConfig.pageSizeInByte = pageSizeInByte;
+ TSFileConfig.maxStringLength = maxStringLength;
dbConfig.setBufferwriteMetaSizeThreshold(metaSizeThreshold);
BasicMemController.getInstance().setCheckInterval(memMonitorInterval);
// clean environment
@@ -122,31 +89,25 @@ public class BufferwriteMetaSizeControlTest {
}
@Test
- public void test() throws BufferWriteProcessorException, WriteProcessException {
+ public void test() throws WriteProcessException, TsFileProcessorException {
if (skip) {
return;
}
String filename = "bufferwritetest";
new File(filename).delete();
- Map<String, Action> parameters = new HashMap<>();
- parameters.put(EngineConstants.BUFFERWRITE_FLUSH_ACTION, bfflushaction);
- parameters.put(EngineConstants.BUFFERWRITE_CLOSE_ACTION, bfcloseaction);
- parameters.put(EngineConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
-
try {
- processor = new BufferWriteProcessor(Directories.getInstance().getTsFolderForTest(), nsp,
- filename,
- parameters, SysTimeVersionController.INSTANCE, FileSchemaUtils.constructFileSchema(nsp));
- } catch (BufferWriteProcessorException e) {
+ processor = new TsFileProcessor(nsp, SysTimeVersionController.INSTANCE,
+ FileSchemaUtils.constructFileSchema(nsp));
+ } catch (TsFileProcessorException e) {
e.printStackTrace();
fail(e.getMessage());
}
File nspdir = PathUtils.getBufferWriteDir(nsp);
- assertEquals(true, nspdir.isDirectory());
+ assertTrue(nspdir.isDirectory());
for (int i = 0; i < 1000000; i++) {
- processor.write(nsp, "s1", i * i, TSDataType.INT64, i + "");
- processor.write(nsp2, "s1", i * i, TSDataType.INT64, i + "");
+ processor.insert(new InsertPlan(nsp, i * i, "s1", i + ""));
+ processor.insert(new InsertPlan(nsp2, i * i, "s1", i + ""));
if (i % 100000 == 0) {
System.out.println(i + "," + MemUtils.bytesCntToStr(processor.getMetaSize()));
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/TsFileSizeControlTest.java
similarity index 53%
rename from iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java
rename to iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/TsFileSizeControlTest.java
index 36035d2..b7d369c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/TsFileSizeControlTest.java
@@ -18,72 +18,39 @@
*/
package org.apache.iotdb.db.engine.memcontrol;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.PathUtils;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
-import org.apache.iotdb.db.engine.EngineConstants;
+import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.FileSchemaUtils;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-public class BufferwriteFileSizeControlTest {
+public class TsFileSizeControlTest {
- Action bfflushaction = new Action() {
+ private TsFileProcessor processor = null;
+ private String nsp = "root.vehicle.d0";
+ private String nsp2 = "root.vehicle.d1";
- @Override
- public void act() throws ActionException {
-
- }
- };
-
- Action bfcloseaction = new Action() {
-
- @Override
- public void act() throws ActionException {
- }
- };
-
- Action fnflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
-
- }
- };
-
- BufferWriteProcessor processor = null;
- String nsp = "root.vehicle.d0";
- String nsp2 = "root.vehicle.d1";
-
- private boolean cachePageData = false;
private int groupSizeInByte;
private int pageCheckSizeThreshold;
private int pageSizeInByte;
private int maxStringLength;
private long fileSizeThreshold;
private long memMonitorInterval;
- private TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
private IoTDBConfig dbConfig = IoTDBDescriptor.getInstance().getConfig();
private boolean skip = !false;
@@ -91,17 +58,17 @@ public class BufferwriteFileSizeControlTest {
@Before
public void setUp() throws Exception {
// origin value
- groupSizeInByte = TsFileConf.groupSizeInByte;
- pageCheckSizeThreshold = TsFileConf.pageCheckSizeThreshold;
- pageSizeInByte = TsFileConf.pageSizeInByte;
- maxStringLength = TsFileConf.maxStringLength;
+ groupSizeInByte = TSFileConfig.groupSizeInByte;
+ pageCheckSizeThreshold = TSFileConfig.pageCheckSizeThreshold;
+ pageSizeInByte = TSFileConfig.pageSizeInByte;
+ maxStringLength = TSFileConfig.maxStringLength;
fileSizeThreshold = dbConfig.getBufferwriteFileSizeThreshold();
memMonitorInterval = dbConfig.getMemMonitorInterval();
// new value
- TsFileConf.groupSizeInByte = 200000;
- TsFileConf.pageCheckSizeThreshold = 3;
- TsFileConf.pageSizeInByte = 10000;
- TsFileConf.maxStringLength = 2;
+ TSFileConfig.groupSizeInByte = 200000;
+ TSFileConfig.pageCheckSizeThreshold = 3;
+ TSFileConfig.pageSizeInByte = 10000;
+ TSFileConfig.maxStringLength = 2;
dbConfig.setBufferwriteFileSizeThreshold(5 * 1024 * 1024);
BasicMemController.getInstance().setCheckInterval(600 * 1000);
// init metadata
@@ -111,10 +78,10 @@ public class BufferwriteFileSizeControlTest {
@After
public void tearDown() throws Exception {
// recovery value
- TsFileConf.groupSizeInByte = groupSizeInByte;
- TsFileConf.pageCheckSizeThreshold = pageCheckSizeThreshold;
- TsFileConf.pageSizeInByte = pageSizeInByte;
- TsFileConf.maxStringLength = maxStringLength;
+ TSFileConfig.groupSizeInByte = groupSizeInByte;
+ TSFileConfig.pageCheckSizeThreshold = pageCheckSizeThreshold;
+ TSFileConfig.pageSizeInByte = pageSizeInByte;
+ TSFileConfig.maxStringLength = maxStringLength;
dbConfig.setBufferwriteFileSizeThreshold(fileSizeThreshold);
BasicMemController.getInstance().setCheckInterval(memMonitorInterval);
// clean environment
@@ -122,33 +89,28 @@ public class BufferwriteFileSizeControlTest {
}
@Test
- public void test() throws BufferWriteProcessorException, WriteProcessException {
+ public void test() throws WriteProcessException, TsFileProcessorException {
if (skip) {
return;
}
String filename = "bufferwritetest";
+ //noinspection ResultOfMethodCallIgnored
new File(filename).delete();
- Map<String, Action> parameters = new HashMap<>();
- parameters.put(EngineConstants.BUFFERWRITE_FLUSH_ACTION, bfflushaction);
- parameters.put(EngineConstants.BUFFERWRITE_CLOSE_ACTION, bfcloseaction);
- parameters.put(EngineConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
-
try {
- processor = new BufferWriteProcessor(Directories.getInstance().getTsFolderForTest(), nsp,
- filename,
- parameters, SysTimeVersionController.INSTANCE, FileSchemaUtils.constructFileSchema(nsp));
- } catch (BufferWriteProcessorException e) {
+ processor = new TsFileProcessor(nsp, SysTimeVersionController.INSTANCE,
+ FileSchemaUtils.constructFileSchema(nsp));
+ } catch (TsFileProcessorException e) {
e.printStackTrace();
fail(e.getMessage());
}
File nspdir = PathUtils.getBufferWriteDir(nsp);
- assertEquals(true, nspdir.isDirectory());
+ assertTrue(nspdir.isDirectory());
for (int i = 0; i < 1000000; i++) {
- processor.write(nsp, "s1", i * i, TSDataType.INT64, i + "");
- processor.write(nsp2, "s1", i * i, TSDataType.INT64, i + "");
+ processor.insert(new InsertPlan(nsp,i * i, "s1", i + ""));
+ processor.insert(new InsertPlan(nsp2, i * i, "s1", i + ""));
if (i % 100000 == 0) {
- System.out.println(i + "," + MemUtils.bytesCntToStr(processor.getFileSize()));
+ System.out.println(i + "," + MemUtils.bytesCntToStr(processor.currentFileSize()));
}
}
// wait to flush end
@@ -158,7 +120,7 @@ public class BufferwriteFileSizeControlTest {
e.printStackTrace();
}
processor.close();
- assertTrue(processor.getFileSize() < dbConfig.getBufferwriteFileSizeThreshold());
+ assertTrue(processor.currentFileSize() < dbConfig.getBufferwriteFileSizeThreshold());
fail("Method unimplemented");
}
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index d08af05..15b0280 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.engine.modification;
-import static junit.framework.TestCase.assertTrue;
import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID;
import static org.junit.Assert.assertEquals;
@@ -29,16 +28,17 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import junit.framework.TestCase;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.filenode.DatabaseEngine;
-import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
import org.apache.iotdb.db.engine.datasource.QueryDataSource;
-import org.apache.iotdb.db.exception.StorageGroupManagerException;
-import org.apache.iotdb.db.exception.MetadataArgsErrorException;
+import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
@@ -48,8 +48,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -69,7 +67,7 @@ public class DeletionFileNodeTest {
}
@Before
- public void setup() throws MetadataArgsErrorException,
+ public void setup() throws
PathErrorException, IOException, StorageGroupManagerException, StartupException {
EnvironmentUtils.envSetUp();
@@ -77,7 +75,7 @@ public class DeletionFileNodeTest {
for (int i = 0; i < 10; i++) {
MManager.getInstance().addPathToMTree(processorName + "." + measurements[i], dataType,
encoding);
- DatabaseEngine.getInstance()
+ DatabaseEngineFactory.getCurrent()
.addTimeSeries(new Path(processorName, measurements[i]), TSDataType.valueOf(dataType),
TSEncoding.valueOf(encoding), CompressionType.valueOf(TSFileConfig.compressor),
Collections.emptyMap());
@@ -94,17 +92,18 @@ public class DeletionFileNodeTest {
StorageGroupManagerException {
for (int i = 1; i <= 100; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().delete(processorName, measurements[3], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[4], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 30);
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[3], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[4], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 30);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 50);
SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName,
measurements[5]), null);
@@ -126,22 +125,23 @@ public class DeletionFileNodeTest {
@Test
public void testDeleteInBufferWriteFile() throws StorageGroupManagerException, IOException {
for (int i = 1; i <= 100; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().closeAll();
+ DatabaseEngineFactory.getCurrent().closeAll();
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[4], 40);
- DatabaseEngine.getInstance().delete(processorName, measurements[3], 30);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[4], 40);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[3], 30);
Modification[] realModifications = new Modification[]{
- new Deletion(processorName + "." + measurements[5], 102, 50),
- new Deletion(processorName + "." + measurements[4], 103, 40),
- new Deletion(processorName + "." + measurements[3], 104, 30),
+ new Deletion(processorName + "." + measurements[5], 103, 50),
+ new Deletion(processorName + "." + measurements[4], 105, 40),
+ new Deletion(processorName + "." + measurements[3], 107, 30),
};
String fileNodePath = Directories.getInstance().getTsFileFolder(0) + File.separator
@@ -149,16 +149,16 @@ public class DeletionFileNodeTest {
File fileNodeDir = new File(fileNodePath);
File[] modFiles = fileNodeDir.listFiles((dir, name)
-> name.endsWith(ModificationFile.FILE_SUFFIX));
- assertEquals(modFiles.length, 1);
+ assertEquals(1, modFiles.length);
LocalTextModificationAccessor accessor =
new LocalTextModificationAccessor(modFiles[0].getPath());
try {
Collection<Modification> modifications = accessor.read();
- assertEquals(modifications.size(), 3);
+ assertEquals( 3, modifications.size());
int i = 0;
for (Modification modification : modifications) {
- assertTrue(modification.equals(realModifications[i++]));
+ assertEquals(realModifications[i++], modification);
}
} finally {
accessor.close();
@@ -169,27 +169,29 @@ public class DeletionFileNodeTest {
public void testDeleteInOverflowCache() throws StorageGroupManagerException {
// insert into BufferWrite
for (int i = 101; i <= 200; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().closeAll();
+ DatabaseEngineFactory.getCurrent().closeAll();
// insert into Overflow
for (int i = 1; i <= 100; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().delete(processorName, measurements[3], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[4], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 30);
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[3], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[4], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 30);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 50);
SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName,
measurements[5]), null);
@@ -199,7 +201,7 @@ public class DeletionFileNodeTest {
.getQueryDataSource(expression.getSeriesPath(), TEST_QUERY_CONTEXT);
Iterator<TimeValuePair> timeValuePairs =
- dataSource.getOverflowSeriesDataSource().getReadableMemChunk().getIterator();
+ dataSource.getOverflowSeriesDataSource().getReadableChunk().getIterator();
int count = 0;
while (timeValuePairs.hasNext()) {
timeValuePairs.next();
@@ -214,48 +216,50 @@ public class DeletionFileNodeTest {
public void testDeleteInOverflowFile() throws StorageGroupManagerException, IOException {
// insert into BufferWrite
for (int i = 101; i <= 200; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().closeAll();
+ DatabaseEngineFactory.getCurrent().closeAll();
// insert into Overflow
for (int i = 1; i <= 100; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().closeAll();
+ DatabaseEngineFactory.getCurrent().closeAll();
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[4], 40);
- DatabaseEngine.getInstance().delete(processorName, measurements[3], 30);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[4], 40);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[3], 30);
Modification[] realModifications = new Modification[]{
new Deletion(processorName + "." + measurements[5], 103, 50),
- new Deletion(processorName + "." + measurements[4], 104, 40),
- new Deletion(processorName + "." + measurements[3], 105, 30),
+ new Deletion(processorName + "." + measurements[4], 105, 40),
+ new Deletion(processorName + "." + measurements[3], 107, 30),
};
- String fileNodePath = IoTDBDescriptor.getInstance().getConfig().getOverflowDataDirs()[0] + File.separator
- + processorName + File.separator + "0" + File.separator;
+ String fileNodePath = IoTDBDescriptor.getInstance().getConfig().getOverflowDataDirs()[0] +
+ File.separator + processorName + File.separator;
File fileNodeDir = new File(fileNodePath);
File[] modFiles = fileNodeDir.listFiles((dir, name)
-> name.endsWith(ModificationFile.FILE_SUFFIX));
- assertEquals(modFiles.length, 1);
+ assertEquals(1, modFiles.length);
LocalTextModificationAccessor accessor =
new LocalTextModificationAccessor(modFiles[0].getPath());
Collection<Modification> modifications = accessor.read();
- assertEquals(modifications.size(), 3);
+ assertEquals(3, modifications.size());
int i = 0;
for (Modification modification : modifications) {
- assertTrue(modification.equals(realModifications[i++]));
+ TestCase.assertEquals(realModifications[i++], modification);
}
}
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
index 21f00db..fef39ba 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
@@ -26,13 +26,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.iotdb.db.engine.filenode.DatabaseEngine;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
-import org.apache.iotdb.db.exception.StorageGroupManagerException;
-import org.apache.iotdb.db.exception.MetadataArgsErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.executor.EngineQueryRouter;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -42,8 +42,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -64,7 +62,7 @@ public class DeletionQueryTest {
}
@Before
- public void setup() throws MetadataArgsErrorException,
+ public void setup() throws
PathErrorException, IOException, StorageGroupManagerException, StartupException {
EnvironmentUtils.envSetUp();
@@ -72,7 +70,7 @@ public class DeletionQueryTest {
for (int i = 0; i < 10; i++) {
MManager.getInstance().addPathToMTree(processorName + "." + measurements[i], dataType,
encoding);
- DatabaseEngine.getInstance()
+ DatabaseEngineFactory.getCurrent()
.addTimeSeries(new Path(processorName, measurements[i]), TSDataType.valueOf(dataType),
TSEncoding.valueOf(encoding), CompressionType.valueOf(TSFileConfig.compressor),
Collections.emptyMap());
@@ -89,17 +87,18 @@ public class DeletionQueryTest {
StorageGroupManagerException, IOException {
for (int i = 1; i <= 100; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().delete(processorName, measurements[3], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[4], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 30);
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[3], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[4], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 30);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 50);
List<Path> pathList = new ArrayList<>();
pathList.add(new Path(processorName, measurements[3]));
@@ -120,17 +119,18 @@ public class DeletionQueryTest {
@Test
public void testDeleteInBufferWriteFile() throws StorageGroupManagerException, IOException {
for (int i = 1; i <= 100; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().closeAll();
+ DatabaseEngineFactory.getCurrent().closeAll();
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[4], 40);
- DatabaseEngine.getInstance().delete(processorName, measurements[3], 30);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[4], 40);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[3], 30);
List<Path> pathList = new ArrayList<>();
pathList.add(new Path(processorName, measurements[3]));
@@ -152,27 +152,29 @@ public class DeletionQueryTest {
public void testDeleteInOverflowCache() throws StorageGroupManagerException, IOException {
// insert into BufferWrite
for (int i = 101; i <= 200; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().closeAll();
+ DatabaseEngineFactory.getCurrent().closeAll();
// insert into Overflow
for (int i = 1; i <= 100; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().delete(processorName, measurements[3], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[4], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 30);
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[3], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[4], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 30);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 50);
List<Path> pathList = new ArrayList<>();
pathList.add(new Path(processorName, measurements[3]));
@@ -194,27 +196,29 @@ public class DeletionQueryTest {
public void testDeleteInOverflowFile() throws StorageGroupManagerException, IOException {
// insert into BufferWrite
for (int i = 101; i <= 200; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().closeAll();
+ DatabaseEngineFactory.getCurrent().closeAll();
// insert into Overflow
for (int i = 1; i <= 100; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().closeAll();
+ DatabaseEngineFactory.getCurrent().closeAll();
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[4], 40);
- DatabaseEngine.getInstance().delete(processorName, measurements[3], 30);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[4], 40);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[3], 30);
List<Path> pathList = new ArrayList<>();
pathList.add(new Path(processorName, measurements[3]));
@@ -234,51 +238,54 @@ public class DeletionQueryTest {
@Test
public void testSuccessiveDeletion()
- throws StorageGroupManagerException, IOException, InterruptedException {
+ throws StorageGroupManagerException, IOException {
for (int i = 1; i <= 100; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().delete(processorName, measurements[3], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[4], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 30);
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[3], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[4], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 30);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 50);
- DatabaseEngine.getInstance().forceFlush(UsageLevel.DANGEROUS);
+ DatabaseEngineFactory.getCurrent().forceFlush(UsageLevel.DANGEROUS);
for (int i = 101; i <= 200; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().delete(processorName, measurements[3], 250);
- DatabaseEngine.getInstance().delete(processorName, measurements[4], 250);
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 230);
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 250);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[3], 250);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[4], 250);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 230);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 250);
- DatabaseEngine.getInstance().forceFlush(UsageLevel.DANGEROUS);
+ DatabaseEngineFactory.getCurrent().forceFlush(UsageLevel.DANGEROUS);
for (int i = 201; i <= 300; i++) {
- TSRecord record = new TSRecord(i, processorName);
- for (int j = 0; j < 10; j++) {
- record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ String[] values = new String[measurements.length];
+ for (int j = 0; j < measurements.length; j++) {
+ values[j] = String.valueOf(i * 1.0);
}
- DatabaseEngine.getInstance().insert(record, false);
+ InsertPlan plan = new InsertPlan(processorName, i, measurements, values);
+ DatabaseEngineFactory.getCurrent().insert(plan, false);
}
- DatabaseEngine.getInstance().delete(processorName, measurements[3], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[4], 50);
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 30);
- DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[3], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[4], 50);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 30);
+ DatabaseEngineFactory.getCurrent().deleteData(processorName, measurements[5], 50);
- DatabaseEngine.getInstance().closeAll();
+ DatabaseEngineFactory.getCurrent().closeAll();
List<Path> pathList = new ArrayList<>();
pathList.add(new Path(processorName, measurements[3]));
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowIOTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowIOTest.java
deleted file mode 100644
index 6c2ffc8..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowIOTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import org.apache.iotdb.tsfile.read.reader.TsFileInput;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class OverflowIOTest {
-
- private String overflowFilePath = "overflowfile";
- private OverflowIO io = null;
- private TsFileInput reader = null;
-
- @Before
- public void setUp() throws Exception {
- io = new OverflowIO(new OverflowIO.OverflowReadWriter(overflowFilePath));
- reader = new OverflowIO.OverflowReadWriter(overflowFilePath);
- }
-
- @After
- public void tearDown() throws Exception {
- io.close();
- reader.close();
- File file = new File(overflowFilePath);
- file.delete();
- }
-
- @Test
- public void testFileCutoff() throws IOException {
- File file = new File("testoverflowfile");
- FileOutputStream fileOutputStream = new FileOutputStream(file);
- byte[] bytes = new byte[20];
- fileOutputStream.write(bytes);
- fileOutputStream.close();
- assertEquals(20, file.length());
- OverflowIO overflowIO = new OverflowIO(new OverflowIO.OverflowReadWriter(file.getPath()));
- assertEquals(20, file.length());
- overflowIO.close();
- file.delete();
- }
-
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtableTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtableTest.java
deleted file mode 100644
index 7a4ea05..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtableTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Collections;
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.BytesUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class OverflowMemtableTest {
-
- private OverflowMemtable support = new OverflowMemtable();
- private String deviceId1 = "d1";
- private String deviceId2 = "d2";
- private String measurementId1 = "s1";
- private String measurementId2 = "s2";
- private TSDataType dataType1 = TSDataType.INT32;
- private TSDataType dataType2 = TSDataType.FLOAT;
- private float error = 0.000001f;
-
- @Before
- public void setUp() throws Exception {
-
- assertEquals(true, support.isEmptyOfOverflowSeriesMap());
- assertEquals(true, support.isEmptyOfMemTable());
- // d1 s1
- support.update(deviceId1, measurementId1, 2, 10, dataType1, BytesUtils.intToBytes(10));
- support.update(deviceId1, measurementId1, 20, 30, dataType1, BytesUtils.intToBytes(20));
- // time :[2,10] [20,30] value: int [10,10] int[20,20]
- // d1 s2
- support.delete(deviceId1, measurementId2, 10, false);
- support.update(deviceId1, measurementId2, 20, 30, dataType1, BytesUtils.intToBytes(20));
- // time: [0,-10] [20,30] value[20,20]
- // d2 s1
- support.update(deviceId2, measurementId1, 10, 20, dataType2, BytesUtils.floatToBytes(10.5f));
- support.update(deviceId2, measurementId1, 15, 40, dataType2, BytesUtils.floatToBytes(20.5f));
- // time: [5,9] [10,40] value [10.5,10.5] [20.5,20.5]
- // d2 s2
- support.update(deviceId2, measurementId2, 2, 10, dataType2, BytesUtils.floatToBytes(5.5f));
- support.delete(deviceId2, measurementId2, 20, false);
- // time : [0,-20]
-
- }
-
- @After
- public void tearDown() throws Exception {
- support.clear();
- }
-
- @Test
- public void testInsert() {
- support.clear();
- assertEquals(true, support.isEmptyOfMemTable());
- OverflowTestUtils.produceInsertData(support);
- assertEquals(false, support.isEmptyOfMemTable());
-
- int num = 1;
- for (TimeValuePair pair : support
- .queryOverflowInsertInMemory(deviceId1, measurementId1, dataType1, Collections.emptyMap())
- .getSortedTimeValuePairList()) {
- assertEquals(num, pair.getTimestamp());
- assertEquals(num, pair.getValue().getInt());
- num++;
- }
- num = 1;
- for (TimeValuePair pair : support
- .queryOverflowInsertInMemory(deviceId2, measurementId2, dataType2, Collections.emptyMap())
- .getSortedTimeValuePairList()) {
- assertEquals(num, pair.getTimestamp());
- if (num == 2) {
- assertEquals(10.5, pair.getValue().getFloat(), error);
- } else {
- assertEquals(5.5, pair.getValue().getFloat(), error);
- }
- num++;
- }
- }
-
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java
deleted file mode 100644
index c968f54..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.io.FileUtils;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.EngineConstants;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.OverflowProcessorException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-/**
- * Overflow Insert Benchmark. This class is used to bench overflow processor module and gets its performance.
- */
-public class OverflowProcessorBenchmark {
-
- private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig();
-
- private static int numOfDevice = 100;
- private static int numOfMeasurement = 100;
- private static int numOfPoint = 1000;
-
- private static String[] deviceIds = new String[numOfDevice];
- private static String[] measurementIds = new String[numOfMeasurement];
- private static FileSchema fileSchema = new FileSchema();
- private static TSDataType tsDataType = TSDataType.INT64;
-
- static {
- for (int i = 0; i < numOfDevice; i++) {
- deviceIds[i] = String.valueOf("d" + i);
- }
- }
-
- static {
- for (int i = 0; i < numOfMeasurement; i++) {
- measurementIds[i] = String.valueOf("m" + i);
- MeasurementSchema measurementDescriptor = new MeasurementSchema("m" + i, tsDataType,
- TSEncoding.PLAIN);
- assert measurementDescriptor.getCompressor() != null;
- fileSchema.registerMeasurement(measurementDescriptor);
-
- }
- }
-
- private static void before() throws IOException {
- for (String file : TsFileDBConf.getOverflowDataDirs()) {
- FileUtils.deleteDirectory(new File(file));
- }
- }
-
- private static void after() throws IOException {
- before();
- }
-
- public static void main(String[] args) throws IOException, OverflowProcessorException {
- Map<String, Action> parameters = new HashMap<>();
- parameters.put(EngineConstants.OVERFLOW_FLUSH_ACTION, new Action() {
- @Override
- public void act() throws ActionException {
- System.out.println(EngineConstants.OVERFLOW_FLUSH_ACTION);
- }
- });
- parameters.put(EngineConstants.FILENODE_PROCESSOR_FLUSH_ACTION, new Action() {
- @Override
- public void act() throws ActionException {
- System.out.println(EngineConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
- }
- });
- OverflowProcessor overflowProcessor = new OverflowProcessor("Overflow_bench", parameters,
- fileSchema, SysTimeVersionController.INSTANCE);
- long startTime = System.currentTimeMillis();
- for (int i = 0; i < numOfPoint; i++) {
- for (int j = 0; j < numOfDevice; j++) {
- TSRecord tsRecord = getRecord(deviceIds[j]);
- overflowProcessor.insert(tsRecord);
- }
- }
- long endTime = System.currentTimeMillis();
- overflowProcessor.close();
- System.out.println(String.format(
- "Num of time series: %d, " + "Num of points for each time series: %d, "
- + "The total time: %d ms. ",
- numOfMeasurement * numOfDevice, numOfPoint, endTime - startTime));
-
- after();
- }
-
- private static TSRecord getRecord(String deviceId) {
- long time = System.nanoTime();
- long value = System.nanoTime();
- TSRecord tsRecord = new TSRecord(time, deviceId);
- for (String measurement : measurementIds) {
- tsRecord.addTuple(new LongDataPoint(measurement, value));
- }
- return tsRecord;
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
deleted file mode 100644
index 340b48e..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.db.engine.PathUtils;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.EngineConstants;
-import org.apache.iotdb.db.engine.datasource.MergeSeriesDataSource;
-import org.apache.iotdb.db.engine.datasource.OverflowSeriesDataSource;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.OverflowProcessorException;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class OverflowProcessorTest {
-
- private String processorName = "test";
- private OverflowProcessor processor = null;
- private Map<String, Action> parameters = null;
-
- private Action overflowflushaction = new Action() {
- @Override
- public void act() throws ActionException {
- System.out.println("overflow flush action");
- }
- };
-
- private Action filenodeflushaction = new Action() {
- @Override
- public void act() throws ActionException {
- System.out.println("filenode flush action");
- }
- };
-
- @Before
- public void setUp() throws Exception {
- EnvironmentUtils.envSetUp();
- parameters = new HashMap<String, Action>();
- parameters.put(EngineConstants.OVERFLOW_FLUSH_ACTION, overflowflushaction);
- parameters.put(EngineConstants.FILENODE_PROCESSOR_FLUSH_ACTION, filenodeflushaction);
- }
-
- @After
- public void tearDown() throws Exception {
- EnvironmentUtils.cleanEnv();
- }
-
- @Test
- public void testInsertUpdate()
- throws IOException, OverflowProcessorException, InterruptedException {
- processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(),
- SysTimeVersionController.INSTANCE);
- assertEquals(true, new File(PathUtils.getOverflowWriteDir(processorName),
- "0").exists());
- assertEquals(false, processor.isFlush());
- assertEquals(false, processor.isMerge());
- QueryContext context = new QueryContext();
- // write update data
- OverflowSeriesDataSource overflowSeriesDataSource = processor.query(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1, Collections.emptyMap(),
- context);
- assertEquals(OverflowTestUtils.dataType1, overflowSeriesDataSource.getDataType());
- Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
- assertEquals(1, overflowSeriesDataSource.getOverflowInsertFileList().size());
- assertEquals(0,
- overflowSeriesDataSource.getOverflowInsertFileList().get(0).getChunkMetaDataList().size());
- processor.flush();
- assertEquals(false, processor.isMerge());
- // write insert data
- OverflowTestUtils.produceInsertData(processor);
- TimeUnit.SECONDS.sleep(1);
- assertEquals(false, processor.isFlush());
- overflowSeriesDataSource = processor
- .query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1, Collections.emptyMap(), context);
- assertEquals(OverflowTestUtils.dataType1, overflowSeriesDataSource.getDataType());
- Assert.assertEquals(false, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
- assertEquals(1, overflowSeriesDataSource.getOverflowInsertFileList().size());
- Iterator<TimeValuePair> iterator = overflowSeriesDataSource.getReadableMemChunk().getIterator();
- for (int i = 1; i <= 3; i++) {
- assertEquals(true, iterator.hasNext());
- TimeValuePair pair = iterator.next();
- assertEquals(i, pair.getTimestamp());
- assertEquals(i, pair.getValue().getInt());
- }
- // flush synchronously
- processor.close();
- processor.reopen();
- overflowSeriesDataSource = processor
- .query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1, Collections.emptyMap(), context);
- Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
- assertEquals(1, overflowSeriesDataSource.getOverflowInsertFileList().size());
- assertEquals(1,
- overflowSeriesDataSource.getOverflowInsertFileList().get(0).getChunkMetaDataList().size());
- processor.switchWorkToMerge();
- overflowSeriesDataSource = processor
- .query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1, Collections.emptyMap(), context);
- assertEquals(2, overflowSeriesDataSource.getOverflowInsertFileList().size());
- assertEquals(1,
- overflowSeriesDataSource.getOverflowInsertFileList().get(0).getChunkMetaDataList().size());
- assertEquals(0,
- overflowSeriesDataSource.getOverflowInsertFileList().get(1).getChunkMetaDataList().size());
- assertEquals(true, processor.isMerge());
- assertEquals(false, processor.canBeClosed());
- MergeSeriesDataSource mergeSeriesDataSource = processor.queryMerge(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1, context);
- assertEquals(1, mergeSeriesDataSource.getInsertFile().getChunkMetaDataList().size());
- processor.switchMergeToWork();
- overflowSeriesDataSource = processor
- .query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1, Collections.emptyMap(), context);
- processor.close();
- processor.clear();
- }
-
- @Test
- public void testWriteMemoryAndQuery() throws IOException, OverflowProcessorException {
- processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(),
- SysTimeVersionController.INSTANCE);
- OverflowTestUtils.produceInsertData(processor);
- processor.close();
- QueryContext context = new QueryContext();
- // test query
- processor.reopen();
- OverflowSeriesDataSource overflowSeriesDataSource = processor.query(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2, Collections.emptyMap(),
- context);
- Assert.assertTrue(overflowSeriesDataSource.getReadableMemChunk().isEmpty());
- assertEquals(0,
- overflowSeriesDataSource.getOverflowInsertFileList().get(0).getChunkMetaDataList().size());
- processor.clear();
- }
-
- @Test
- public void testFlushAndQuery() throws IOException, OverflowProcessorException {
- processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(),
- SysTimeVersionController.INSTANCE);
- processor.flush();
- // waiting for the end of flush.
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- }
- QueryContext context = new QueryContext();
- processor.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1, Collections.emptyMap(), context);
- OverflowTestUtils.produceInsertData(processor);
- processor.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType2, Collections.emptyMap(), context);
- processor.close();
- processor.clear();
- }
-
- @Test
- public void testRecovery() throws OverflowProcessorException, IOException {
- processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(),
- SysTimeVersionController.INSTANCE);
- processor.close();
- processor.switchWorkToMerge();
- assertEquals(true, processor.isMerge());
- processor.clear();
- OverflowProcessor overflowProcessor = new OverflowProcessor(processorName, parameters,
- OverflowTestUtils.getFileSchema(), SysTimeVersionController.INSTANCE);
- // recovery query
- assertEquals(false, overflowProcessor.isMerge());
- overflowProcessor.switchWorkToMerge();
- QueryContext context = new QueryContext();
- OverflowSeriesDataSource overflowSeriesDataSource = overflowProcessor
- .query(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1, Collections.emptyMap(),
- context);
- Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
- assertEquals(2, overflowSeriesDataSource.getOverflowInsertFileList().size());
- overflowProcessor.switchMergeToWork();
- overflowProcessor.close();
- overflowProcessor.clear();
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
deleted file mode 100644
index c7662bf..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class OverflowResourceTest {
-
- private OverflowResource work;
- private File insertFile;
- private String insertFileName = "unseqTsFile";
- private String folderPath = "overflow";
- private String dataPath = "1";
- private OverflowMemtable memtable = new OverflowMemtable();
-
- @Before
- public void setUp() throws Exception {
- work = new OverflowResource(folderPath, dataPath, SysTimeVersionController.INSTANCE);
- insertFile = new File(new File(folderPath, dataPath), insertFileName);
- }
-
- @After
- public void tearDown() throws Exception {
- work.close();
- memtable.clear();
- EnvironmentUtils.cleanDir(folderPath);
- }
-
- @Test
- public void testOverflowInsert() throws IOException {
- OverflowTestUtils.produceInsertData(memtable);
- QueryContext context = new QueryContext();
- work.flush(OverflowTestUtils.getFileSchema(), memtable.getMemTabale(), "processorName");
- List<ChunkMetaData> chunkMetaDatas = work.getInsertMetadatas(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2, context);
- assertEquals(0, chunkMetaDatas.size());
- work.appendMetadatas();
- chunkMetaDatas = work
- .getInsertMetadatas(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1, context);
- assertEquals(1, chunkMetaDatas.size());
- ChunkMetaData chunkMetaData = chunkMetaDatas.get(0);
- assertEquals(OverflowTestUtils.dataType1, chunkMetaData.getTsDataType());
- assertEquals(OverflowTestUtils.measurementId1, chunkMetaData.getMeasurementUid());
- // close
- work.close();
- // append file
- long originlength = insertFile.length();
- FileOutputStream fileOutputStream = new FileOutputStream(insertFile, true);
- fileOutputStream.write(new byte[20]);
- fileOutputStream.close();
- assertEquals(originlength + 20, insertFile.length());
- work = new OverflowResource(folderPath, dataPath, SysTimeVersionController.INSTANCE);
- chunkMetaDatas = work
- .getInsertMetadatas(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1, context);
- assertEquals(1, chunkMetaDatas.size());
- chunkMetaData = chunkMetaDatas.get(0);
- assertEquals(OverflowTestUtils.dataType1, chunkMetaData.getTsDataType());
- assertEquals(OverflowTestUtils.measurementId1, chunkMetaData.getMeasurementUid());
- assertEquals(originlength, insertFile.length());
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowTestUtils.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowTestUtils.java
deleted file mode 100644
index 5b40870..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowTestUtils.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.io;
-
-import java.io.IOException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-public class OverflowTestUtils {
-
- public static String deviceId1 = "d1";
- public static String deviceId2 = "d2";
- public static String measurementId1 = "s1";
- public static String measurementId2 = "s2";
- public static TSDataType dataType1 = TSDataType.INT32;
- public static TSDataType dataType2 = TSDataType.FLOAT;
- private static FileSchema fileSchema = new FileSchema();
-
- static {
- fileSchema
- .registerMeasurement(new MeasurementSchema(measurementId1, dataType1, TSEncoding.PLAIN));
- fileSchema
- .registerMeasurement(new MeasurementSchema(measurementId2, dataType2, TSEncoding.PLAIN));
- }
-
- public static FileSchema getFileSchema() {
- return fileSchema;
- }
-
- public static void produceInsertData(OverflowMemtable support) {
- support.insert(getData(deviceId1, measurementId1, dataType1, String.valueOf(1), 1));
- support.insert(getData(deviceId1, measurementId1, dataType1, String.valueOf(3), 3));
- support.insert(getData(deviceId1, measurementId1, dataType1, String.valueOf(2), 2));
-
- support.insert(getData(deviceId2, measurementId2, dataType2, String.valueOf(5.5f), 1));
- support.insert(getData(deviceId2, measurementId2, dataType2, String.valueOf(5.5f), 2));
- support.insert(getData(deviceId2, measurementId2, dataType2, String.valueOf(10.5f), 2));
- }
-
- private static TSRecord getData(String d, String m, TSDataType type, String value, long time) {
- TSRecord record = new TSRecord(time, d);
- record.addTuple(DataPoint.getDataPoint(type, m, value));
- return record;
- }
-
- public static void produceInsertData(OverflowProcessor processor) throws IOException {
-
- processor.insert(getData(deviceId1, measurementId1, dataType1, String.valueOf(1), 1));
- processor.insert(getData(deviceId1, measurementId1, dataType1, String.valueOf(3), 3));
- processor.insert(getData(deviceId1, measurementId1, dataType1, String.valueOf(2), 2));
-
- processor.insert(getData(deviceId2, measurementId2, dataType2, String.valueOf(5.5f), 1));
- processor.insert(getData(deviceId2, measurementId2, dataType2, String.valueOf(5.5f), 2));
- processor.insert(getData(deviceId2, measurementId2, dataType2, String.valueOf(10.5f), 2));
- }
-
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OFFileMetadataTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OFFileMetadataTest.java
deleted file mode 100644
index 715fe36..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OFFileMetadataTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.metadata;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class OFFileMetadataTest {
-
- private String path = "OFFileMetadataTest";
-
- @Before
- public void setUp() throws Exception {
-
- }
-
- @After
- public void tearDown() throws Exception {
- File file = new File(path);
- if (file.exists()) {
- file.delete();
- }
- }
-
- @Test
- public void testOFFileMetadata() throws Exception {
- OFFileMetadata ofFileMetadata = OverflowTestHelper.createOFFileMetadata();
- serialize(ofFileMetadata);
- OFFileMetadata deOFFileMetadata = deSerialize();
- // assert
- OverflowUtils.isOFFileMetadataEqual(ofFileMetadata, deOFFileMetadata);
- }
-
- private void serialize(OFFileMetadata ofFileMetadata) throws FileNotFoundException {
- FileOutputStream outputStream = new FileOutputStream(path);
- try {
- ofFileMetadata.serializeTo(outputStream);
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- if (outputStream != null) {
- try {
- outputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- private OFFileMetadata deSerialize() throws FileNotFoundException {
- FileInputStream inputStream = new FileInputStream(path);
- try {
- return OFFileMetadata.deserializeFrom(inputStream);
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- if (inputStream != null) {
- try {
- inputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return null;
- }
-}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OFRowGroupListMetadataTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OFRowGroupListMetadataTest.java
deleted file mode 100644
index 138a29e..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OFRowGroupListMetadataTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.metadata;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class OFRowGroupListMetadataTest {
-
- private String path = "OFRowGroupListMetadataTest";
-
- @Before
- public void setUp() throws Exception {
-
- }
-
- @After
- public void tearDown() throws Exception {
- File file = new File(path);
- if (file.exists()) {
- file.delete();
- }
- }
-
- @Test
- public void testOFRowGroupListMetadata() throws Exception {
- OFRowGroupListMetadata ofRowGroupListMetadata = OverflowTestHelper
- .createOFRowGroupListMetadata();
- serialize(ofRowGroupListMetadata);
- OFRowGroupListMetadata deOfRowGroupListMetadata = deSerialized();
- OverflowUtils.isOFRowGroupListMetadataEqual(ofRowGroupListMetadata, deOfRowGroupListMetadata);
- }
-
- private void serialize(OFRowGroupListMetadata ofRowGroupListMetadata)
- throws FileNotFoundException {
- FileOutputStream outputStream = new FileOutputStream(path);
- try {
- ofRowGroupListMetadata.serializeTo(outputStream);
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- if (outputStream != null) {
- try {
- outputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- private OFRowGroupListMetadata deSerialized() throws FileNotFoundException {
- FileInputStream inputStream = new FileInputStream(path);
- try {
- OFRowGroupListMetadata ofRowGroupListMetadata = OFRowGroupListMetadata
- .deserializeFrom(inputStream);
- return ofRowGroupListMetadata;
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- if (inputStream != null) {
- try {
- inputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return null;
- }
-}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OFSeriesListMetadataTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OFSeriesListMetadataTest.java
deleted file mode 100644
index 7216731..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OFSeriesListMetadataTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.metadata;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class OFSeriesListMetadataTest {
-
- private final String path = "OFSeriesListMetadataTest";
-
- @Before
- public void setUp() throws Exception {
-
- }
-
- @After
- public void tearDown() throws Exception {
- File file = new File(path);
- if (file.exists()) {
- file.delete();
- }
- }
-
- @Test
- public void testOfSeriesListMetadataSerDe() throws Exception {
- OFSeriesListMetadata ofSeriesListMetadata = OverflowTestHelper.createOFSeriesListMetadata();
- serialized(ofSeriesListMetadata);
- OFSeriesListMetadata deOfSeriesListMetadata = deSerialized();
- // assert
- OverflowUtils.isOFSeriesListMetadataEqual(ofSeriesListMetadata, deOfSeriesListMetadata);
- }
-
- private void serialized(OFSeriesListMetadata obj) throws FileNotFoundException {
- FileOutputStream fileOutputStream = new FileOutputStream(path);
- try {
- obj.serializeTo(fileOutputStream);
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- try {
- fileOutputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- private OFSeriesListMetadata deSerialized() throws FileNotFoundException {
- FileInputStream fileInputStream = new FileInputStream(path);
- try {
- OFSeriesListMetadata ofSeriesListMetadata = OFSeriesListMetadata
- .deserializeFrom(fileInputStream);
- return ofSeriesListMetadata;
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- try {
- fileInputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- return null;
- }
-}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OverflowTestHelper.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OverflowTestHelper.java
deleted file mode 100644
index 9b66886..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OverflowTestHelper.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.metadata;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.TsDigest;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-
-public class OverflowTestHelper {
-
- public static final String MEASUREMENT_UID = "sensor231";
- public static final long FILE_OFFSET = 2313424242L;
- public static final long NUM_OF_POINTS = 123456L;
- public static final long START_TIME = 523372036854775806L;
- public static final long END_TIME = 523372036854775806L;
- public static final TSDataType DATA_TYPE = TSDataType.INT64;
- private static String deviceId = "device";
-
- public static ChunkMetaData createSimpleTimeSeriesChunkMetaData() {
- ChunkMetaData metaData = new ChunkMetaData(MEASUREMENT_UID, DATA_TYPE, FILE_OFFSET, START_TIME,
- END_TIME// ,
- // ChunkMetaDataTest.ENCODING_TYPE
- );
- metaData.setNumOfPoints(NUM_OF_POINTS);
- metaData.setDigest(new TsDigest());
- return metaData;
- }
-
- public static List<ChunkMetaData> createChunkMetaDataList(int count) {
- List<ChunkMetaData> ret = new ArrayList<>();
- for (int i = 0; i < count; i++) {
- ret.add(createSimpleTimeSeriesChunkMetaData());
- }
- return ret;
- }
-
- public static OFSeriesListMetadata createOFSeriesListMetadata() {
- OFSeriesListMetadata ofSeriesListMetadata = new OFSeriesListMetadata(MEASUREMENT_UID,
- createChunkMetaDataList(5));
- return ofSeriesListMetadata;
- }
-
- public static OFRowGroupListMetadata createOFRowGroupListMetadata() {
- OFRowGroupListMetadata ofRowGroupListMetadata = new OFRowGroupListMetadata(deviceId);
- for (int i = 0; i < 5; i++) {
- ofRowGroupListMetadata.addSeriesListMetaData(createOFSeriesListMetadata());
- }
- return ofRowGroupListMetadata;
- }
-
- public static OFFileMetadata createOFFileMetadata() {
- OFFileMetadata ofFileMetadata = new OFFileMetadata();
- ofFileMetadata.setLastFooterOffset(100);
- for (int i = 0; i < 5; i++) {
- ofFileMetadata.addRowGroupListMetaData(createOFRowGroupListMetadata());
- }
- return ofFileMetadata;
- }
-
- public static List<String> getJSONArray() {
- List<String> jsonMetaData = new ArrayList<String>();
- jsonMetaData.add("fsdfsfsd");
- jsonMetaData.add("424fd");
- return jsonMetaData;
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OverflowUtils.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OverflowUtils.java
deleted file mode 100644
index a60f9e1..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/metadata/OverflowUtils.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.metadata;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-
-public class OverflowUtils {
-
- /**
- * when one of A and B is Null, A != B, so test case fails.
- *
- * @param objectA
- * @param objectB
- * @param name
- * @return false - A and B both are NULL, so we do not need to check whether their members are equal true - A and B
- * both are not NULL, so we need to check their members
- */
- public static boolean isTwoObjectsNotNULL(Object objectA, Object objectB, String name) {
- if ((objectA == null) && (objectB == null)) {
- return false;
- }
- if ((objectA == null) ^ (objectB == null)) {
- fail(String.format("one of %s is null", name));
- }
- return true;
- }
-
- public static void isOFSeriesListMetadataEqual(OFSeriesListMetadata ofSeriesListMetadata1,
- OFSeriesListMetadata ofSeriesListMetadata2) {
- if (isTwoObjectsNotNULL(ofSeriesListMetadata1, ofSeriesListMetadata2, "OFSeriesListMetadata")) {
- if (isTwoObjectsNotNULL(ofSeriesListMetadata1.getMeasurementId(),
- ofSeriesListMetadata2.getMeasurementId(),
- "measurement id")) {
- assertTrue(ofSeriesListMetadata1.getMeasurementId()
- .equals(ofSeriesListMetadata2.getMeasurementId()));
- }
- assertEquals(ofSeriesListMetadata1.getMetaDatas().size(),
- ofSeriesListMetadata2.getMetaDatas().size());
- List<ChunkMetaData> chunkMetaDataList1 = ofSeriesListMetadata1.getMetaDatas();
- List<ChunkMetaData> chunkMetaDataList2 = ofSeriesListMetadata2.getMetaDatas();
- for (int i = 0; i < chunkMetaDataList1.size(); i++) {
- isTimeSeriesChunkMetadataEqual(chunkMetaDataList1.get(i), chunkMetaDataList2.get(i));
- }
- }
- }
-
- public static void isTimeSeriesChunkMetadataEqual(ChunkMetaData metadata1,
- ChunkMetaData metadata2) {
- if (isTwoObjectsNotNULL(metadata1, metadata2, "ChunkMetaData")) {
- if (isTwoObjectsNotNULL(metadata1.getMeasurementUid(), metadata2.getMeasurementUid(),
- "sensorUID")) {
- assertTrue(metadata1.getMeasurementUid().equals(metadata2.getMeasurementUid()));
- }
- assertTrue(metadata1.getOffsetOfChunkHeader() == metadata2.getOffsetOfChunkHeader());
- assertTrue(metadata1.getNumOfPoints() == metadata2.getNumOfPoints());
- assertTrue(metadata1.getStartTime() == metadata2.getStartTime());
- assertTrue(metadata1.getEndTime() == metadata2.getEndTime());
- if (isTwoObjectsNotNULL(metadata1.getDigest(), metadata2.getDigest(), "digest")) {
- isMapBufferEqual(metadata1.getDigest().getStatistics(),
- metadata2.getDigest().getStatistics(),
- "statistics");
- }
- }
- }
-
- public static void isMapBufferEqual(Map<String, ByteBuffer> mapA, Map<String, ByteBuffer> mapB,
- String name) {
- if ((mapA == null) ^ (mapB == null)) {
- System.out.println("error");
- fail(String.format("one of %s is null", name));
- }
- if ((mapA != null) && (mapB != null)) {
- if (mapA.size() != mapB.size()) {
- fail(String.format("%s size is different", name));
- }
- for (String key : mapB.keySet()) {
- ByteBuffer b = mapB.get(key);
- ByteBuffer a = mapA.get(key);
- assertTrue(b.equals(a));
- }
- }
- }
-
- public static void isOFRowGroupListMetadataEqual(OFRowGroupListMetadata ofRowGroupListMetadata1,
- OFRowGroupListMetadata ofRowGroupListMetadata2) {
- if (isTwoObjectsNotNULL(ofRowGroupListMetadata1, ofRowGroupListMetadata2,
- "OFRowGroupListMetadata")) {
- assertTrue(
- ofRowGroupListMetadata1.getdeviceId().equals(ofRowGroupListMetadata2.getdeviceId()));
- List<OFSeriesListMetadata> list1 = ofRowGroupListMetadata1.getSeriesList();
- List<OFSeriesListMetadata> list2 = ofRowGroupListMetadata2.getSeriesList();
- assertEquals(list1.size(), list2.size());
- for (int i = 0; i < list1.size(); i++) {
- isOFSeriesListMetadataEqual(list1.get(i), list2.get(i));
- }
- }
- }
-
- public static void isOFFileMetadataEqual(OFFileMetadata ofFileMetadata1,
- OFFileMetadata ofFileMetadata2) {
- if (isTwoObjectsNotNULL(ofFileMetadata1, ofFileMetadata2, "OFFileMetadata")) {
- assertEquals(ofFileMetadata1.getLastFooterOffset(), ofFileMetadata2.getLastFooterOffset());
- List<OFRowGroupListMetadata> list1 = ofFileMetadata1.getRowGroupLists();
- List<OFRowGroupListMetadata> list2 = ofFileMetadata2.getRowGroupLists();
- assertNotNull(list1);
- assertNotNull(list2);
- assertEquals(list1.size(), list2.size());
- for (int i = 0; i < list1.size(); i++) {
- isOFRowGroupListMetadataEqual(list1.get(i), list2.get(i));
- }
- }
- }
-
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessorTest.java
index c30ecc4..6e3cfa7 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessorTest.java
@@ -20,26 +20,24 @@
package org.apache.iotdb.db.engine.overflowdata;
import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.cleanEnv;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.envSetUp;
import java.io.IOException;
-import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.datasource.SeriesDataSource;
import org.apache.iotdb.db.engine.sgmanager.OperationResult;
import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessorTest;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.exception.StorageGroupManagerException;
-import org.apache.iotdb.db.exception.FileNodeProcessorException;
-import org.apache.iotdb.db.exception.MetadataArgsErrorException;
-import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
import org.apache.iotdb.db.utils.ImmediateFuture;
+import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -51,19 +49,20 @@ public class OverflowProcessorTest extends TsFileProcessorTest {
IoTDBDescriptor.getInstance().getConfig().setEnableWal(true);
super.setUp();
processor.close();
- processor = new OverflowProcessor("root.test", doNothingAction, doNothingAction, doNothingAction,
- SysTimeVersionController.INSTANCE, schema);
-
+ processor = new OverflowProcessor("root.test", SysTimeVersionController.INSTANCE,
+ schema);
}
@After
public void tearDown() throws Exception {
super.tearDown();
+ cleanEnv();
}
@Test
public void insert()
- throws BufferWriteProcessorException, IOException, ExecutionException, InterruptedException, FileNodeProcessorException, StorageGroupManagerException, PathErrorException, MetadataArgsErrorException {
+ throws IOException, ExecutionException, InterruptedException,
+ TsFileProcessorException {
String[] s1 = new String[]{"s1"};
String[] s2 = new String[]{"s2"};
String[] value = new String[]{"5.0"};
@@ -100,12 +99,16 @@ public class OverflowProcessorTest extends TsFileProcessorTest {
processor.delete("root.test.d1", "s1", 8);
processor.delete("root.test.d3", "s1", 8);
- QueryExpression qe = QueryExpression.create(
- Collections.singletonList(new Path("root.test.d1", "s1")), null);
- QueryDataSet result = queryManager.query(qe, processor, TEST_QUERY_CONTEXT);
- while (result.hasNext()) {
- RowRecord record = result.next();
- System.out.println(record.getTimestamp() + "," + record.getFields().get(0).getFloatV());
+ SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(
+ new Path("root.test.d1", "s1"), null);
+ SeriesDataSource dataSource = processor.query(singleSeriesExpression, TEST_QUERY_CONTEXT);
+ SequenceDataReader dataReader = new SequenceDataReader(dataSource, null, TEST_QUERY_CONTEXT);
+ while (dataReader.hasNext()) {
+ BatchData batch = dataReader.nextBatch();
+ while (batch.hasNext()) {
+ System.out.println(batch.currentTime() +"," + batch.getFloat());
+ batch.next();
+ }
}
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
index 25ab9af..866fb89 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.engine.tsfiledata;
import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
+import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Collections;
@@ -28,29 +29,24 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.ActionException;
+import org.apache.iotdb.db.engine.datasource.SeriesDataSource;
import org.apache.iotdb.db.engine.sgmanager.OperationResult;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
-import org.apache.iotdb.db.exception.FileNodeProcessorException;
-import org.apache.iotdb.db.exception.MetadataArgsErrorException;
-import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.executor.EngineQueryRouter;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.ImmediateFuture;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.write.schema.FileSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
@@ -63,18 +59,12 @@ import org.slf4j.LoggerFactory;
public class TsFileProcessorTest {
private static Logger LOGGER = LoggerFactory.getLogger(TsFileProcessorTest.class);
protected TsFileProcessor processor;
- protected MManager mManager;
- protected EngineQueryRouter queryManager;
- protected Action doNothingAction = new Action() {
- @Override
- public void act() throws ActionException {
- }
- };
- protected Map<String, MeasurementSchema> measurementSchemaMap = new HashMap<>();
+ private MManager mManager;
+ private Map<String, MeasurementSchema> measurementSchemaMap = new HashMap<>();
protected FileSchema schema;
- protected long oldBufferwriteFileSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold();
+ private long oldTsFileSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold();
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
@@ -82,18 +72,24 @@ public class TsFileProcessorTest {
IoTDBDescriptor.getInstance().getConfig().setEnableWal(true);
IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(2*1024*1024);
mManager = MManager.getInstance();
- queryManager = new EngineQueryRouter();
- measurementSchemaMap.put("s1", new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
- measurementSchemaMap.put("s2", new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
- measurementSchemaMap.put("s3", new MeasurementSchema("s3", TSDataType.FLOAT, TSEncoding.RLE));
+ measurementSchemaMap.put("s1", new MeasurementSchema("s1", TSDataType.FLOAT,
+ TSEncoding.RLE));
+ measurementSchemaMap.put("s2", new MeasurementSchema("s2", TSDataType.FLOAT,
+ TSEncoding.RLE));
+ measurementSchemaMap.put("s3", new MeasurementSchema("s3", TSDataType.FLOAT,
+ TSEncoding.RLE));
schema = new FileSchema(measurementSchemaMap);
- processor = new TsFileProcessor("root.test", doNothingAction, doNothingAction, doNothingAction,
- SysTimeVersionController.INSTANCE, schema);
+ processor = new TsFileProcessor("root.test", SysTimeVersionController.INSTANCE,
+ schema);
mManager.setStorageLevelToMTree("root.test");
- mManager.addPathToMTree("root.test.d1.s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
- mManager.addPathToMTree("root.test.d2.s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
- mManager.addPathToMTree("root.test.d1.s2", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
- mManager.addPathToMTree("root.test.d2.s2", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
+ mManager.addPathToMTree("root.test.d1.s1", TSDataType.FLOAT, TSEncoding.RLE,
+ CompressionType.SNAPPY, Collections.emptyMap());
+ mManager.addPathToMTree("root.test.d2.s1", TSDataType.FLOAT, TSEncoding.RLE,
+ CompressionType.SNAPPY, Collections.emptyMap());
+ mManager.addPathToMTree("root.test.d1.s2", TSDataType.FLOAT, TSEncoding.RLE,
+ CompressionType.SNAPPY, Collections.emptyMap());
+ mManager.addPathToMTree("root.test.d2.s2", TSDataType.FLOAT, TSEncoding.RLE,
+ CompressionType.SNAPPY, Collections.emptyMap());
}
@@ -104,20 +100,24 @@ public class TsFileProcessorTest {
processor.removeMe();
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
- IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(oldBufferwriteFileSizeThreshold);
+ IoTDBDescriptor.getInstance().getConfig().
+ setBufferwriteFileSizeThreshold(oldTsFileSizeThreshold);
}
@Test
public void insert()
- throws BufferWriteProcessorException, IOException, ExecutionException, InterruptedException, FileNodeProcessorException, StorageGroupManagerException, PathErrorException, MetadataArgsErrorException {
+ throws IOException, ExecutionException, InterruptedException, TsFileProcessorException {
String[] s1 = new String[]{"s1"};
String[] s2 = new String[]{"s2"};
String[] value = new String[]{"5.0"};
- ;
- Assert.assertEquals(
- OperationResult.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 10, s1, value)));
- Assert.assertEquals(OperationResult.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 10, s2, value)));
- Assert.assertEquals(OperationResult.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
+
+ assertEquals(
+ OperationResult.WRITE_SUCCESS,
+ processor.insert(new InsertPlan("root.test.d1", 10, s1, value)));
+ assertEquals(OperationResult.WRITE_SUCCESS,
+ processor.insert(new InsertPlan("root.test.d1", 10, s2, value)));
+ assertEquals(OperationResult.WRITE_SUCCESS,
+ processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
Future<Boolean> ok = processor.flush();
ok.get();
ok = processor.flush();
@@ -128,100 +128,105 @@ public class TsFileProcessorTest {
ok.get();
//let's rewrite timestamp =12 again..
- Assert.assertEquals(OperationResult.WRITE_REJECT_BY_TIME, processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
+ assertEquals(OperationResult.WRITE_REJECT_BY_TIME,
+ processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
processor.delete("root.test.d1", "s1",12);
- Assert.assertEquals(OperationResult.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
- Assert.assertEquals(OperationResult.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 13, s1, value)));
- Assert.assertEquals(OperationResult.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d2", 10, s1, value)));
- Assert.assertEquals(OperationResult.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 14, s1, value)));
+ assertEquals(OperationResult.WRITE_SUCCESS,
+ processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
+ assertEquals(OperationResult.WRITE_SUCCESS,
+ processor.insert(new InsertPlan("root.test.d1", 13, s1, value)));
+ assertEquals(OperationResult.WRITE_SUCCESS,
+ processor.insert(new InsertPlan("root.test.d2", 10, s1, value)));
+ assertEquals(OperationResult.WRITE_SUCCESS,
+ processor.insert(new InsertPlan("root.test.d1", 14, s1, value)));
processor.delete("root.test.d1", "s1",12);
processor.delete("root.test.d3", "s1",12);
+ executeQuery("root.test.d1", "s1", TEST_QUERY_CONTEXT, 2);
+ }
- QueryExpression qe = QueryExpression.create(Collections.singletonList(new Path("root.test.d1", "s1")), null);
- QueryDataSet result = queryManager.query(qe, processor, TEST_QUERY_CONTEXT);
- while (result.hasNext()) {
- RowRecord record = result.next();
- System.out.println(record.getTimestamp() +"," + record.getFields().get(0).getFloatV());
+ private void executeQuery(String deviceId, String measurementId, QueryContext context,
+ int expectedCnt)
+ throws IOException {
+ SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(
+ new Path(deviceId, measurementId), null);
+ SeriesDataSource dataSource = processor.query(singleSeriesExpression, context);
+ SequenceDataReader dataReader = new SequenceDataReader(dataSource, null, context);
+ int i = 0;
+ while (dataReader.hasNext()) {
+ BatchData batch = dataReader.nextBatch();
+ while (batch.hasNext()) {
+ //System.out.println(batch.currentTime() +"," + batch.getFloat());
+ batch.next();
+ i ++;
+ }
+ }
+ if (expectedCnt >= 0) {
+ assertEquals(expectedCnt, i);
}
}
@Test
- public void bruteForceTest() throws InterruptedException, StorageGroupManagerException, IOException {
+ public void bruteForceTest() throws InterruptedException, IOException {
final boolean[] exception = {false, false, false};
final boolean[] goon = {true};
- int totalsize = 50000;
+ int totalSize = 50000;
final int[] count = {0};
- QueryExpression qe = QueryExpression.create(Collections.singletonList(new Path("root.test.d1", "s1")), null);
- Thread insertThread = new Thread() {
- @Override
- public void run() {
- int i =0;
- long time = 100L;
- long start = System.currentTimeMillis();
- String[] sensors = new String[]{"s1"};
- String[] values = new String[1];
- try {
- for (int j = 0; j < totalsize && goon[0]; j++) {
- processor.lock(true);
+ Thread insertThread = new Thread(() -> {
+ long time = 100L;
+ long start = System.currentTimeMillis();
+ String[] sensors = new String[]{"s1"};
+ String[] values = new String[1];
+ try {
+ for (int j = 0; j < totalSize && goon[0]; j++) {
+ processor.lock(true);
// processor.insert("root.test.d1","s1", time++, String.valueOf(j));
// processor.insert("root.test.d2","s1", time++, String.valueOf(j));
- values[0] = String.valueOf(j);
- processor.insert(new InsertPlan("root.test.d1", time++, sensors, values));
- processor.insert(new InsertPlan("root.test.d2", time++, sensors, values));
- processor.writeUnlock();
- count[0]++;
- }
- System.out.println((System.currentTimeMillis() - start));
- } catch (BufferWriteProcessorException | IOException e) {
- // we will break out.
- LOGGER.error(e.getMessage());
- exception[0] = true;
+ values[0] = String.valueOf(j);
+ processor.insert(new InsertPlan("root.test.d1", time++, sensors, values));
+ processor.insert(new InsertPlan("root.test.d2", time++, sensors, values));
+ processor.writeUnlock();
+ count[0]++;
}
+ System.out.println((System.currentTimeMillis() - start));
+ } catch (TsFileProcessorException e) {
+ LOGGER.error("", e);
+ exception[0] = true;
}
- };
- Thread flushThread = new Thread() {
- @Override
- public void run() {
- try {
- for (int j = 0; j < totalsize * 2 && goon[0]; j++) {
- processor.lock(true);
- processor.flush();
- processor.writeUnlock();
- }
- } catch (IOException e) {
- // we will break out.
- LOGGER.error(e.getMessage());
- exception[1] = true;
+ });
+ Thread flushThread = new Thread(() -> {
+ try {
+ for (int j = 0; j < totalSize * 2 && goon[0]; j++) {
+ processor.lock(true);
+ processor.flush();
+ processor.writeUnlock();
}
+ } catch (IOException e) {
+ // we will break out.
+ LOGGER.error(e.getMessage());
+ exception[1] = true;
}
- };
+ });
//we temporary disable the query because there are bugs..
- Thread queryThread = new Thread() {
- @Override
- public void run() {
- try {
- for (int j = 0; j < totalsize * 2 && goon[0]; j++) {
- processor.lock(false);
- QueryContext context = new QueryContext(QueryResourceManager.getInstance().assignJobId());
- QueryDataSet result = queryManager.query(qe, processor, context);
- while (result.hasNext()) {
- result.next();
- }
- QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
- processor.readUnlock();
- }
- } catch (IOException | StorageGroupManagerException e) {
- // we will break out.
- LOGGER.error(e.getMessage());
- exception[2] = true;
+ Thread queryThread = new Thread(() -> {
+ try {
+ for (int j = 0; j < totalSize * 2 && goon[0]; j++) {
+ processor.lock(false);
+ QueryContext context = new QueryContext(QueryResourceManager.getInstance().assignJobId());
+ executeQuery("root.test.d1", "s1", context, -1);
+ QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
+ processor.readUnlock();
}
+ } catch (IOException | StorageGroupManagerException e) {
+ // we will break out.
+ LOGGER.error(e.getMessage());
+ exception[2] = true;
}
- };
+ });
flushThread.start();
insertThread.start();
queryThread.start();
@@ -237,13 +242,7 @@ public class TsFileProcessorTest {
this.wait(50);
}
}
- QueryDataSet result = queryManager.query(qe, processor, TEST_QUERY_CONTEXT);
- int size =0;
- while (result.hasNext()) {
- RowRecord record = result.next();
- size ++;
- }
- //Assert.assertEquals(count[0], size);
+ executeQuery("root.test.d1", "s1", TEST_QUERY_CONTEXT, 50000);
}
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/QueryDataFromUnclosedTsFileIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/QueryDataFromUnclosedTsFileIT.java
index d701c7a..15bfe96 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/QueryDataFromUnclosedTsFileIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/QueryDataFromUnclosedTsFileIT.java
@@ -21,16 +21,19 @@ package org.apache.iotdb.db.integration;
import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID;
+import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Collections;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.DatabaseEngine;
-import org.apache.iotdb.db.exception.StorageGroupManagerException;
+import org.apache.iotdb.db.engine.DatabaseEngine;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
import org.apache.iotdb.db.exception.MetadataArgsErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.executor.EngineQueryRouter;
@@ -41,18 +44,16 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class QueryDataFromUnclosedTsFileIT {
- long bufferWriteFileSize;
- DatabaseEngine sgManager;
- MManager mManager;
- EngineQueryRouter queryManager;
+ private long bufferWriteFileSize;
+ private DatabaseEngine sgManager;
+ private MManager mManager;
+ private EngineQueryRouter queryManager;
@Before
public void setUp() throws IOException, StorageGroupManagerException, StartupException {
EnvironmentUtils.cleanEnv();
@@ -60,15 +61,14 @@ public class QueryDataFromUnclosedTsFileIT {
TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
bufferWriteFileSize = IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold();
- //IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(100);
- sgManager = DatabaseEngine.getInstance();
+ sgManager = DatabaseEngineFactory.getCurrent();
mManager = MManager.getInstance();
queryManager = new EngineQueryRouter();
}
@After
public void tearDown() throws StorageGroupManagerException, IOException {
- IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(bufferWriteFileSize);;
+ IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(bufferWriteFileSize);
EnvironmentUtils.cleanEnv();
@@ -78,25 +78,29 @@ public class QueryDataFromUnclosedTsFileIT {
public void test()
throws StorageGroupManagerException, IOException, PathErrorException, MetadataArgsErrorException {
mManager.setStorageLevelToMTree("root.test");
- mManager.addPathToMTree("root.test.d1.s1", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
- mManager.addPathToMTree("root.test.d2.s1", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
- sgManager.addTimeSeries(new Path("root.test.d1", "s1"), TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY, Collections
+ mManager.addPathToMTree("root.test.d1.s1", TSDataType.INT32, TSEncoding.RLE,
+ CompressionType.SNAPPY, Collections.emptyMap());
+ mManager.addPathToMTree("root.test.d2.s1", TSDataType.INT32, TSEncoding.RLE,
+ CompressionType.SNAPPY, Collections.emptyMap());
+ sgManager.addTimeSeries(new Path("root.test.d1", "s1"), TSDataType.INT32,
+ TSEncoding.RLE, CompressionType.SNAPPY, Collections
.emptyMap());
- sgManager.addTimeSeries(new Path("root.test.d2", "s1"), TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY, Collections
+ sgManager.addTimeSeries(new Path("root.test.d2", "s1"), TSDataType.INT32,
+ TSEncoding.RLE, CompressionType.SNAPPY, Collections
.emptyMap());
- long time = System.currentTimeMillis();
for (int i=0; i < 20000; i++) {
- sgManager.insert(new TSRecord(i, "root.test.d1").addTuple(new IntDataPoint("s1", i)), false);
- sgManager.insert(new TSRecord(i, "root.test.d2").addTuple(new IntDataPoint("s1", i)), false);
+ sgManager.insert(new InsertPlan("root.test.d1", i, "s1", String.valueOf(i)), false);
+ sgManager.insert(new InsertPlan("root.test.d2", i, "s1", String.valueOf(i)), false);
}
QueryExpression qe = QueryExpression
.create(Collections.singletonList(new Path("root.test.d1", "s1")), null);
QueryDataSet result = queryManager.query(qe, TEST_QUERY_CONTEXT);
+ int cnt = 0;
while (result.hasNext()) {
result.next();
- //System.out.println(record.getTimestamp() + "," + record.getFields().get(0).getIntV());
+ cnt ++;
}
-
+ assertEquals(20000, cnt);
}
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/monitor/MonitorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/monitor/MonitorTest.java
index f02381d..5b382d7 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/monitor/MonitorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/monitor/MonitorTest.java
@@ -28,14 +28,14 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.DatabaseEngine;
+import org.apache.iotdb.db.engine.DatabaseEngine;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.MonitorConstants.FileSizeConstants;
import org.apache.iotdb.db.monitor.collector.FileSize;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -64,7 +64,7 @@ public class MonitorTest {
@Test
public void testDatabaseEngineMonitorAndAddMetadata() {
- DatabaseEngine dbEngine = DatabaseEngine.getInstance();
+ DatabaseEngine dbEngine = DatabaseEngineFactory.getCurrent();
FileSize fileSize = FileSize.getInstance();
statMonitor = StatMonitor.getInstance();
statMonitor.registerStatStorageGroup();
@@ -103,21 +103,21 @@ public class MonitorTest {
// Get stat data and test right
- Map<String, TSRecord> statHashMap = dbEngine.getAllStatisticsValue();
- Map<String, TSRecord> fileSizeStatMap = fileSize.getAllStatisticsValue();
+ Map<String, InsertPlan> statHashMap = dbEngine.getAllStatisticsValue();
+ Map<String, InsertPlan> fileSizeStatMap = fileSize.getAllStatisticsValue();
String path = dbEngine.getAllPathForStatistic().get(0);
String fileSizeStatPath = fileSize.getAllPathForStatistic().get(0);
int pos = path.lastIndexOf('.');
int fileSizeStatPos = fileSizeStatPath.lastIndexOf('.');
- TSRecord fTSRecord = statHashMap.get(path.substring(0, pos));
- TSRecord fileSizeRecord = fileSizeStatMap.get(fileSizeStatPath.substring(0, fileSizeStatPos));
+ InsertPlan fPlan = statHashMap.get(path.substring(0, pos));
+ InsertPlan fileSizePlan = fileSizeStatMap.get(fileSizeStatPath.substring(0, fileSizeStatPos));
- assertNotEquals(null, fTSRecord);
- assertNotEquals(null, fileSizeRecord);
- for (DataPoint dataPoint : fTSRecord.dataPointList) {
- String m = dataPoint.getMeasurementId();
- Long v = (Long) dataPoint.getValue();
+ assertNotEquals(null, fPlan);
+ assertNotEquals(null, fileSizePlan);
+ for (int i = 0; i < fPlan.getMeasurements().length; i++) {
+ String m = fPlan.getMeasurements()[i];
+ Long v = Long.parseLong(fPlan.getValues()[i]);
if (m.equals("TOTAL_REQ_SUCCESS")) {
assertEquals(v, new Long(0));
}
@@ -129,9 +129,9 @@ public class MonitorTest {
assertEquals(v, new Long(0));
}
}
- for (DataPoint dataPoint : fileSizeRecord.dataPointList) {
- String m = dataPoint.getMeasurementId();
- Long v = (Long) dataPoint.getValue();
+ for (int i = 0; i < fileSizePlan.getMeasurements().length; i++) {
+ String m = fileSizePlan.getMeasurements()[i];
+ Long v = Long.parseLong(fileSizePlan.getValues()[i]);
if (m.equals(FileSizeConstants.OVERFLOW.name())) {
assertEquals(v, new Long(0));
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/LogicalPlanSmallTest.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/LogicalPlanSmallTest.java
index f8786a9..b9eec27 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/LogicalPlanSmallTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/LogicalPlanSmallTest.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.exception.qp.LogicalOptimizeException;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.qp.logical.RootOperator;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
-import org.apache.iotdb.db.qp.logical.crud.SFWOperator;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.strategy.LogicalGenerator;
import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
import org.apache.iotdb.db.qp.utils.MemIntQpExecutor;
@@ -66,7 +66,7 @@ public class LogicalPlanSmallTest {
AstNode astNode = ParseUtils.findRootNonNullToken(astTree);
RootOperator operator = generator.getLogicalPlan(astNode);
Assert.assertEquals(operator.getClass(), QueryOperator.class);
- Assert.assertEquals(((QueryOperator) operator).getSeriesLimit(), 10);
+ Assert.assertEquals(10, ((QueryOperator) operator).getSeriesLimit());
}
@Test(expected = LogicalOperatorException.class)
@@ -81,7 +81,7 @@ public class LogicalPlanSmallTest {
"parsing error,statement: " + sqlStr + " .message:" + e.getMessage());
}
AstNode astNode = ParseUtils.findRootNonNullToken(astTree);
- RootOperator operator = generator.getLogicalPlan(astNode);
+ generator.getLogicalPlan(astNode);
// expected to throw LogicalOperatorException: SLIMIT <SN>: SN should be Int32.
}
@@ -97,7 +97,7 @@ public class LogicalPlanSmallTest {
"parsing error,statement: " + sqlStr + " .message:" + e.getMessage());
}
AstNode astNode = ParseUtils.findRootNonNullToken(astTree);
- RootOperator operator = generator.getLogicalPlan(astNode);
+ generator.getLogicalPlan(astNode);
// expected to throw LogicalOperatorException: SLIMIT <SN>: SN must be a positive integer and can not be zero.
}
@@ -115,8 +115,8 @@ public class LogicalPlanSmallTest {
AstNode astNode = ParseUtils.findRootNonNullToken(astTree);
RootOperator operator = generator.getLogicalPlan(astNode);
Assert.assertEquals(operator.getClass(), QueryOperator.class);
- Assert.assertEquals(((QueryOperator) operator).getSeriesLimit(), 10);
- Assert.assertEquals(((QueryOperator) operator).getSeriesOffset(), 1);
+ Assert.assertEquals(10, ((QueryOperator) operator).getSeriesLimit());
+ Assert.assertEquals(1, ((QueryOperator) operator).getSeriesOffset());
}
@Test(expected = LogicalOptimizeException.class)
@@ -146,12 +146,12 @@ public class LogicalPlanSmallTest {
Path path4 = new Path(
new StringContainer(new String[]{"root", "vehicle", "d4", "s1"},
SystemConstant.PATH_SEPARATOR));
- executor.insert(path1, 10, "10");
- executor.insert(path2, 10, "10");
- executor.insert(path3, 10, "10");
- executor.insert(path4, 10, "10");
+ executor.insert(new InsertPlan(path1.getDevice(), 10, path1.getMeasurement(), "10"));
+ executor.insert(new InsertPlan(path2.getDevice(), 10, path2.getMeasurement(), "10"));
+ executor.insert(new InsertPlan(path3.getDevice(), 10, path3.getMeasurement(), "10"));
+ executor.insert(new InsertPlan(path4.getDevice(), 10, path4.getMeasurement(), "10"));
ConcatPathOptimizer concatPathOptimizer = new ConcatPathOptimizer(executor);
- operator = (SFWOperator) concatPathOptimizer.transform(operator);
+ concatPathOptimizer.transform(operator);
// expected to throw LogicalOptimizeException: Wrong use of SLIMIT: SLIMIT is not allowed to be used with
// complete paths.
}
@@ -168,7 +168,7 @@ public class LogicalPlanSmallTest {
"parsing error,statement: " + sqlStr + " .message:" + e.getMessage());
}
AstNode astNode = ParseUtils.findRootNonNullToken(astTree);
- RootOperator operator = generator.getLogicalPlan(astNode);
+ generator.getLogicalPlan(astNode);
// expected to throw LogicalOperatorException: LIMIT <N>: N should be Int32.
}
@@ -184,7 +184,7 @@ public class LogicalPlanSmallTest {
"parsing error,statement: " + sqlStr + " .message:" + e.getMessage());
}
AstNode astNode = ParseUtils.findRootNonNullToken(astTree);
- RootOperator operator = generator.getLogicalPlan(astNode);
+ generator.getLogicalPlan(astNode);
// expected to throw LogicalOperatorException: LIMIT <N>: N must be a positive integer and can not be zero.
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
index 5cbc14b..23d25e1 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
@@ -69,10 +70,14 @@ public class PhysicalPlanTest {
Path path4 = new Path(
new StringContainer(new String[]{"root", "vehicle", "d4", "s1"},
SystemConstant.PATH_SEPARATOR));
- processor.getExecutor().insert(path1, 10, "10");
- processor.getExecutor().insert(path2, 10, "10");
- processor.getExecutor().insert(path3, 10, "10");
- processor.getExecutor().insert(path4, 10, "10");
+ processor.getExecutor().insert(new InsertPlan(path1.getDevice(), 10, path1.getMeasurement(),
+ "10"));
+ processor.getExecutor().insert(new InsertPlan(path2.getDevice(), 10, path2.getMeasurement(),
+ "10"));
+ processor.getExecutor().insert(new InsertPlan(path3.getDevice(), 10, path3.getMeasurement(),
+ "10"));
+ processor.getExecutor().insert(new InsertPlan(path4.getDevice(), 10, path4.getMeasurement(),
+ "10"));
}
@Test
@@ -188,19 +193,19 @@ public class PhysicalPlanTest {
public void testFill3() {
String sqlStr = "SELECT s1 FROM root.vehicle.d1 WHERE time = 5000 Fill(int32[linear, 5m], boolean[previous])";
try {
- PhysicalPlan plan = processor.parseSQLToPhysicalPlan(sqlStr);
+ processor.parseSQLToPhysicalPlan(sqlStr);
} catch (Exception e) {
assertTrue(true);
}
}
@Test
- public void testFill4() throws QueryProcessorException, ArgsErrorException {
+ public void testFill4() {
String sqlStr = "SELECT s1 FROM root.vehicle.d1 WHERE time > 5000 Fill(int32[linear], boolean[previous])";
try {
- PhysicalPlan plan = processor.parseSQLToPhysicalPlan(sqlStr);
+ processor.parseSQLToPhysicalPlan(sqlStr);
} catch (Exception e) {
- assertEquals("Only \"=\" can be used in fill function", e.getMessage().toString());
+ assertEquals("Only \"=\" can be used in fill function", e.getMessage());
}
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index 99b6680..c254657 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.qp.utils;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -26,9 +25,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.TreeSet;
-
-import org.apache.iotdb.db.exception.StorageGroupManagerException;
-import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
@@ -38,7 +34,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.fill.IFill;
-import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -57,6 +52,8 @@ public class MemIntQpExecutor extends QueryProcessExecutor {
// pathStr, TreeMap<time, value>
private Map<String, TestSeries> demoMemDataBase = new HashMap<>();
+ @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
+ // dummy timestamp container
private TreeSet<Long> timeStampUnion = new TreeSet<>();
private Map<String, List<String>> fakeAllPaths;
@@ -100,9 +97,8 @@ public class MemIntQpExecutor extends QueryProcessExecutor {
return flag;
case INSERT:
InsertPlan insert = (InsertPlan) plan;
- int result = multiInsert(insert.getDeviceId(), insert.getTime(), insert.getMeasurements(),
- insert.getValues());
- return result == 0;
+ insert(insert);
+ return true;
default:
throw new UnsupportedOperationException();
}
@@ -110,24 +106,19 @@ public class MemIntQpExecutor extends QueryProcessExecutor {
@Override
public QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression,
- QueryContext context)
- throws ProcessorException, IOException, PathErrorException, StorageGroupManagerException,
- QueryFilterOptimizationException {
+ QueryContext context) {
return null;
}
@Override
public QueryDataSet groupBy(List<Path> paths, List<String> aggres, IExpression expression,
- long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context)
- throws ProcessorException, IOException, PathErrorException, StorageGroupManagerException,
- QueryFilterOptimizationException {
+ long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context) {
return null;
}
@Override
public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
- QueryContext context)
- throws ProcessorException, IOException, PathErrorException, StorageGroupManagerException {
+ QueryContext context) {
return null;
}
@@ -179,15 +170,14 @@ public class MemIntQpExecutor extends QueryProcessExecutor {
}
@Override
- public int insert(Path path, long insertTime, String value) {
- String strPath = path.toString();
+ public void insert(InsertPlan plan) {
+ String strPath = plan.getDeviceId() + "." + plan.getMeasurements()[0];
if (!demoMemDataBase.containsKey(strPath)) {
demoMemDataBase.put(strPath, new TestSeries());
}
- demoMemDataBase.get(strPath).data.put(insertTime, Integer.valueOf(value));
- timeStampUnion.add(insertTime);
- LOG.info("insert into {}:<{},{}>", path, insertTime, value);
- return 0;
+ demoMemDataBase.get(strPath).data.put(plan.getTime(), Integer.valueOf(plan.getValues()[0]));
+ timeStampUnion.add(plan.getTime());
+ LOG.info("insert into {}:<{},{}>", strPath, plan.getTime(), plan.getValues()[0]);
}
@Override
@@ -199,12 +189,6 @@ public class MemIntQpExecutor extends QueryProcessExecutor {
};
}
- @Override
- public int multiInsert(String deviceId, long insertTime, String[] measurementList,
- String[] insertValues) {
- return 0;
- }
-
private class TestSeries {
public TreeMap<Long, Integer> data = new TreeMap<>();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 0c28697..435a2c8 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -26,20 +26,16 @@ import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.cache.RowGroupBlockMetaDataCache;
-import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
-import org.apache.iotdb.db.engine.filenode.DatabaseEngine;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
-import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +54,6 @@ public class EnvironmentUtils {
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static Directories directories = Directories.getInstance();
- private static TSFileConfig tsfileConfig = TSFileDescriptor.getInstance().getConfig();
public static long TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
public static QueryContext TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
@@ -70,10 +65,10 @@ public class EnvironmentUtils {
// clear opened file streams
FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
- // tsFileConfig.duplicateIncompletedPage = false;
+ // tsFileConfig.duplicateIncompletePage = false;
// clean filenode manager
try {
- if (!DatabaseEngine.getInstance().deleteAll()) {
+ if (!DatabaseEngineFactory.getCurrent().deleteAll()) {
LOGGER.error("Can't close the filenode manager in EnvironmentUtils");
Assert.fail();
}
@@ -81,18 +76,15 @@ public class EnvironmentUtils {
throw new IOException(e);
}
StatMonitor.getInstance().close();
- DatabaseEngine.getInstance().resetDatabaseEngine();
+ DatabaseEngineFactory.getCurrent().reset();
// clean wal
MultiFileLogNodeManager.getInstance().stop();
- // clean cache
- TsFileMetaDataCache.getInstance().clear();
- RowGroupBlockMetaDataCache.getInstance().clear();
// close metadata
MManager.getInstance().clear();
MManager.getInstance().flushObjectToFile();
// delete all directory
cleanAllDir();
- // DatabaseEngine.getInstance().reset();
+ // DatabaseEngineFactory.getCurrent().reset();
// reset MemController
BasicMemController.getInstance().close();
}
@@ -107,7 +99,7 @@ public class EnvironmentUtils {
cleanDir(path);
}
// delete filenode
- cleanDir(config.getFileNodeDir());
+ cleanDir(config.getStorageGroupDir());
// delete metadata
cleanDir(config.getMetadataDir());
// delete wal
@@ -116,13 +108,13 @@ public class EnvironmentUtils {
cleanDir(config.getDerbyHome());
// delete index
cleanDir(config.getIndexFileDir());
- // delte data
+ // delete data
cleanDir("data");
- // delte derby log
+ // delete derby log
// cleanDir("derby.log");
}
- public static void cleanDir(String dir) throws IOException {
+ private static void cleanDir(String dir) throws IOException {
File file = new File(dir);
if (file.exists()) {
if (file.isDirectory()) {
@@ -157,7 +149,7 @@ public class EnvironmentUtils {
config.setEnableMemMonitor(false);
// disable the system monitor
config.setEnableStatMonitor(false);
- IAuthorizer authorizer = null;
+ IAuthorizer authorizer;
try {
authorizer = LocalFileAuthorizer.getInstance();
} catch (AuthException e) {
@@ -168,7 +160,7 @@ public class EnvironmentUtils {
} catch (AuthException e) {
throw new StartupException(e.getMessage());
}
- DatabaseEngine.getInstance().resetDatabaseEngine();
+ DatabaseEngineFactory.getCurrent().reset();
MultiFileLogNodeManager.getInstance().start();
TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java
index 20d74ca..3f62423 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java
@@ -23,9 +23,7 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
-import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.MemUtils;
@@ -34,17 +32,13 @@ import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class IoTDBLogFileSizeTest {
- private IoTDB deamon;
-
- private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private TSFileConfig fileConfig = TSFileDescriptor.getInstance().getConfig();
+ private IoTDB daemon;
private boolean skip = true;
@@ -65,11 +59,11 @@ public class IoTDBLogFileSizeTest {
if (skip) {
return;
}
- groupSize = fileConfig.groupSizeInByte;
- fileConfig.groupSizeInByte = 8 * 1024 * 1024;
+ groupSize = TSFileConfig.groupSizeInByte;
+ TSFileConfig.groupSizeInByte = 8 * 1024 * 1024;
EnvironmentUtils.closeStatMonitor();
- deamon = IoTDB.getInstance();
- deamon.active();
+ daemon = IoTDB.getInstance();
+ daemon.active();
EnvironmentUtils.envSetUp();
executeSQL(setUpSqls);
}
@@ -79,9 +73,9 @@ public class IoTDBLogFileSizeTest {
if (skip) {
return;
}
- fileConfig.groupSizeInByte = groupSize;
+ TSFileConfig.groupSizeInByte = groupSize;
executeSQL(tearDownSqls);
- deamon.stop();
+ daemon.stop();
Thread.sleep(5000);
EnvironmentUtils.cleanEnv();
}
@@ -115,8 +109,7 @@ public class IoTDBLogFileSizeTest {
cnt);
statement.execute(sql);
WriteLogNode logNode = MultiFileLogNodeManager.getInstance().getNode(
- "root.logFileTest.bufferwrite" + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX, null,
- null);
+ "root.logFileTest.bufferwrite" + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX, null);
File bufferWriteWALFile = new File(
logNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME);
if (bufferWriteWALFile.exists() && bufferWriteWALFile.length() > maxLength[0]) {
@@ -132,7 +125,6 @@ public class IoTDBLogFileSizeTest {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
- return;
}
}
}
@@ -144,7 +136,7 @@ public class IoTDBLogFileSizeTest {
}
System.out.println(
- "Max size of bufferwrite wal is " + MemUtils.bytesCntToStr(maxLength[0]) + " after "
+ "Max size of TsFile wal is " + MemUtils.bytesCntToStr(maxLength[0]) + " after "
+ runtime + "ms continuous writing");
}
@@ -177,8 +169,7 @@ public class IoTDBLogFileSizeTest {
++cnt, cnt);
statement.execute(sql);
WriteLogNode logNode = MultiFileLogNodeManager.getInstance()
- .getNode("root.logFileTest.overflow" + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX, null,
- null);
+ .getNode("root.logFileTest.overflow" + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX, null);
File WALFile = new File(
logNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME);
if (WALFile.exists() && WALFile.length() > maxLength[0]) {
@@ -194,7 +185,6 @@ public class IoTDBLogFileSizeTest {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
- return;
}
}
}
@@ -210,12 +200,10 @@ public class IoTDBLogFileSizeTest {
+ "ms continuous writing");
}
- private void executeSQL(String[] sqls) throws ClassNotFoundException, SQLException {
+ private void executeSQL(String[] sqls) throws ClassNotFoundException {
Class.forName(Config.JDBC_DRIVER_NAME);
- Connection connection = null;
- try {
- connection = DriverManager
- .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")) {
Statement statement = connection.createStatement();
for (String sql : sqls) {
statement.execute(sql);
@@ -223,10 +211,6 @@ public class IoTDBLogFileSizeTest {
statement.close();
} catch (Exception e) {
e.printStackTrace();
- } finally {
- if (connection != null) {
- connection.close();
- }
}
}
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
index c76bc0b..9e71f63 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
@@ -22,8 +22,6 @@ import java.io.File;
import java.io.IOException;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StorageGroupManagerException;
-import org.apache.iotdb.db.exception.MetadataArgsErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.RecoverException;
import org.apache.iotdb.db.metadata.MManager;
@@ -34,8 +32,6 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -47,7 +43,6 @@ import org.junit.Test;
public class PerformanceTest {
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private TSFileConfig fileConfig = TSFileDescriptor.getInstance().getConfig();
private boolean enableWal;
private boolean skip = true;
@@ -75,19 +70,16 @@ public class PerformanceTest {
long[] forceCycle = new long[]{10, 0};
int oldBatchSize = config.getFlushWalThreshold();
long oldForceCycle = config.getForceWalPeriodInMs();
- for (int j = 0; j < batchSizes.length; j++) {
- for (int k = 0; k < forceCycle.length; k++) {
- config.setFlushWalThreshold(batchSizes[j]);
- config.setForceWalPeriodInMs(forceCycle[k]);
- File tempRestore = new File("testtemp", "restore");
- File tempProcessorStore = new File("testtemp", "processorStore");
+ for (int batchSize : batchSizes) {
+ for (long l : forceCycle) {
+ config.setFlushWalThreshold(batchSize);
+ config.setForceWalPeriodInMs(l);
+ File tempRestore = new File("testTemp", "restore");
tempRestore.getParentFile().mkdirs();
tempRestore.createNewFile();
- tempProcessorStore.createNewFile();
WriteLogNode logNode = new ExclusiveWriteLogNode("root.testLogNode",
- tempRestore.getPath(),
- tempProcessorStore.getPath());
+ tempRestore.getPath());
long time = System.currentTimeMillis();
for (int i = 0; i < 1000000; i++) {
@@ -111,7 +103,6 @@ public class PerformanceTest {
logNode.delete();
tempRestore.delete();
- tempProcessorStore.delete();
tempRestore.getParentFile().delete();
}
}
@@ -121,17 +112,14 @@ public class PerformanceTest {
@Test
public void recoverTest()
- throws IOException, RecoverException, StorageGroupManagerException, PathErrorException,
- MetadataArgsErrorException {
+ throws IOException, RecoverException, PathErrorException {
// this test write 1000000 * 3 logs , recover from them and report elapsed time
if (skip) {
return;
}
- File tempRestore = new File("testtemp", "restore");
- File tempProcessorStore = new File("testtemp", "processorStore");
+ File tempRestore = new File("testTemp", "restore");
tempRestore.getParentFile().mkdirs();
tempRestore.createNewFile();
- tempProcessorStore.createNewFile();
try {
MManager.getInstance().setStorageLevelToMTree("root.logTestDevice");
@@ -149,8 +137,7 @@ public class PerformanceTest {
MManager.getInstance().addPathToMTree("root.logTestDevice.s4", TSDataType.BOOLEAN.name(),
TSEncoding.PLAIN.name());
WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice",
- tempRestore.getPath(),
- tempProcessorStore.getPath());
+ tempRestore.getPath());
for (int i = 0; i < 1000000; i++) {
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100,
@@ -172,7 +159,6 @@ public class PerformanceTest {
} finally {
logNode.delete();
tempRestore.delete();
- tempProcessorStore.delete();
tempRestore.getParentFile().delete();
}
}
@@ -193,7 +179,7 @@ public class PerformanceTest {
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0",
new Path("root.logTestDevice.s1"));
for (int i = 0; i < 20; i++) {
- updatePlan.addInterval(new Pair<Long, Long>(200l, 300l));
+ updatePlan.addInterval(new Pair<>(200L, 300L));
}
DeletePlan deletePlan = new DeletePlan(50, new Path("root.logTestDevice.s1"));
@@ -206,9 +192,9 @@ public class PerformanceTest {
time = System.currentTimeMillis();
for (int i = 0; i < 1000000; i++) {
- bwInsertPlan = (InsertPlan) PhysicalPlanLogTransfer.logToOperator(bytes1);
- updatePlan = (UpdatePlan) PhysicalPlanLogTransfer.logToOperator(bytes2);
- deletePlan = (DeletePlan) PhysicalPlanLogTransfer.logToOperator(bytes3);
+ PhysicalPlanLogTransfer.logToOperator(bytes1);
+ PhysicalPlanLogTransfer.logToOperator(bytes2);
+ PhysicalPlanLogTransfer.logToOperator(bytes3);
}
System.out.println("3000000 logs decoding use " + (System.currentTimeMillis() - time) + "ms");
}
@@ -222,13 +208,13 @@ public class PerformanceTest {
new String[]{"1.0", "15", "str", "false"});
long time = System.currentTimeMillis();
for (int i = 0; i < 1000000; i++) {
- byte[] bytes = PhysicalPlanLogTransfer.operatorToLog(bwInsertPlan);
+ PhysicalPlanLogTransfer.operatorToLog(bwInsertPlan);
}
System.out.println("1000000 logs encoding use " + (System.currentTimeMillis() - time) + "ms");
time = System.currentTimeMillis();
for (int i = 0; i < 1000000; i++) {
- byte[] bytes = sql.getBytes();
+ sql.getBytes();
}
System.out.println("1000000 sqls encoding use " + (System.currentTimeMillis() - time) + "ms");
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
index 9a9617c..5e9df4d 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,7 +38,6 @@ import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
import org.apache.iotdb.db.writelog.recover.ExclusiveLogRecoverPerformer;
-import org.apache.iotdb.db.writelog.recover.RecoverPerformer;
import org.apache.iotdb.db.writelog.replay.LogReplayer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.After;
@@ -68,19 +66,16 @@ public class RecoverTest {
@Test
public void testFullRecover() throws IOException, RecoverException {
// this test write a log file and try to recover from these logs as if no previous attempts exist.
- File tempRestore = new File("testtemp", "restore");
- File tempProcessorStore = new File("testtemp", "processorStore");
+ File tempRestore = new File("testTemp", "restore");
tempRestore.getParentFile().mkdirs();
tempRestore.createNewFile();
- tempProcessorStore.createNewFile();
try {
MManager.getInstance().setStorageLevelToMTree("root.testLogNode");
} catch (PathErrorException ignored) {
}
ExclusiveWriteLogNode logNode = new ExclusiveWriteLogNode("root.testLogNode",
- tempRestore.getPath(),
- tempProcessorStore.getPath());
+ tempRestore.getPath());
try {
InsertPlan bwInsertPlan = new InsertPlan(1, "logTestDevice", 100,
@@ -101,30 +96,25 @@ public class RecoverTest {
logNode.forceSync();
ExclusiveLogRecoverPerformer performer = new ExclusiveLogRecoverPerformer(
- tempRestore.getPath(),
- tempProcessorStore.getPath(), logNode);
+ tempRestore.getPath(), logNode);
// used to check if logs are replayed in order
DummyLogReplayer dummyLogReplayer = new DummyLogReplayer();
dummyLogReplayer.plansToCheck = plansToCheck;
performer.setReplayer(dummyLogReplayer);
- // used to check that FileNode does recover
- DummyFileNodeRecoverPerformer fileNodeRecoverPerformer = new DummyFileNodeRecoverPerformer();
- performer.setFileNodeRecoverPerformer(fileNodeRecoverPerformer);
+ // used to check that storage group does recover
logNode.setRecoverPerformer(performer);
logNode.recover();
- assertTrue(fileNodeRecoverPerformer.called);
// ensure all logs are replayed
assertEquals(plansToCheck.size(), dummyLogReplayer.currPos);
- // the log diretory should be empty now
+ // the log directory should be empty now
File logDir = new File(logNode.getLogDirectory());
File[] files = logDir.listFiles();
assertTrue(files == null || files.length == 0);
} finally {
logNode.delete();
tempRestore.delete();
- tempProcessorStore.delete();
tempRestore.getParentFile().delete();
}
}
@@ -134,25 +124,19 @@ public class RecoverTest {
// this test write a log file and try to recover from these logs as if a previous attempt is interrupted when
// recovering files or replaying logs.
// skip file backup by setting backup flag and creating backup files.
- File tempRestore = new File("testtemp", "restore");
- File tempProcessorStore = new File("testtemp", "processorStore");
- File tempRestoreRecovery = new File("testtemp",
+ File tempRestore = new File("testTemp", "restore");
+ File tempRestoreRecovery = new File("testTemp",
"restore" + ExclusiveLogRecoverPerformer.RECOVER_SUFFIX);
- File tempProcessorStoreRecovery = new File("testtemp",
- "processorStore" + ExclusiveLogRecoverPerformer.RECOVER_SUFFIX);
tempRestore.getParentFile().mkdirs();
tempRestore.createNewFile();
- tempProcessorStore.createNewFile();
tempRestoreRecovery.createNewFile();
- tempProcessorStoreRecovery.createNewFile();
try {
MManager.getInstance().setStorageLevelToMTree("root.testLogNode");
} catch (PathErrorException ignored) {
}
ExclusiveWriteLogNode logNode = new ExclusiveWriteLogNode("root.testLogNode",
- tempRestore.getPath(),
- tempProcessorStore.getPath());
+ tempRestore.getPath());
try {
// set flag
@@ -177,32 +161,26 @@ public class RecoverTest {
logNode.forceSync();
ExclusiveLogRecoverPerformer performer = new ExclusiveLogRecoverPerformer(
- tempRestore.getPath(),
- tempProcessorStore.getPath(), logNode);
+ tempRestore.getPath(), logNode);
// used to check if logs are replayed in order
DummyLogReplayer dummyLogReplayer = new DummyLogReplayer();
dummyLogReplayer.plansToCheck = plansToCheck;
performer.setReplayer(dummyLogReplayer);
// used to check that FileNode does recover
- DummyFileNodeRecoverPerformer fileNodeRecoverPerformer = new DummyFileNodeRecoverPerformer();
- performer.setFileNodeRecoverPerformer(fileNodeRecoverPerformer);
logNode.setRecoverPerformer(performer);
logNode.recover();
- assertTrue(fileNodeRecoverPerformer.called);
// ensure all logs are replayed
assertEquals(plansToCheck.size(), dummyLogReplayer.currPos);
- // the log diretory should be empty now
+ // the log directory should be empty now
File logDir = new File(logNode.getLogDirectory());
File[] files = logDir.listFiles();
assertTrue(files == null || files.length == 0);
} finally {
logNode.delete();
tempRestore.delete();
- tempProcessorStore.delete();
assertTrue(!tempRestoreRecovery.exists());
- assertTrue(!tempProcessorStoreRecovery.exists());
tempRestore.getParentFile().delete();
}
}
@@ -212,25 +190,19 @@ public class RecoverTest {
// this test write a log file and try to recover from these logs as if a previous attempt is interrupted when
// cleanup files.
// skip previous stage by setting backup flag and creating backup files.
- File tempRestore = new File("testtemp", "restore");
- File tempProcessorStore = new File("testtemp", "processorStore");
- File tempRestoreRecovery = new File("testtemp",
+ File tempRestore = new File("testTemp", "restore");
+ File tempRestoreRecovery = new File("testTemp",
"restore" + ExclusiveLogRecoverPerformer.RECOVER_SUFFIX);
- File tempProcessorStoreRecovery = new File("testtemp",
- "processorStore" + ExclusiveLogRecoverPerformer.RECOVER_SUFFIX);
tempRestore.getParentFile().mkdirs();
tempRestore.createNewFile();
- tempProcessorStore.createNewFile();
tempRestoreRecovery.createNewFile();
- tempProcessorStoreRecovery.createNewFile();
try {
MManager.getInstance().setStorageLevelToMTree("root.testLogNode");
} catch (PathErrorException ignored) {
}
ExclusiveWriteLogNode logNode = new ExclusiveWriteLogNode("root.testLogNode",
- tempRestore.getPath(),
- tempProcessorStore.getPath());
+ tempRestore.getPath());
try {
// set flag
@@ -244,58 +216,37 @@ public class RecoverTest {
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50, new Path("root.logTestDevice.s1"));
- List<PhysicalPlan> plansToCheck = new ArrayList<>();
- plansToCheck.add(bwInsertPlan);
- plansToCheck.add(updatePlan);
- plansToCheck.add(deletePlan);
-
logNode.write(bwInsertPlan);
logNode.write(updatePlan);
logNode.write(deletePlan);
logNode.forceSync();
ExclusiveLogRecoverPerformer performer = new ExclusiveLogRecoverPerformer(
- tempRestore.getPath(),
- tempProcessorStore.getPath(), logNode);
+ tempRestore.getPath(), logNode);
// used to check that no log is replayed
DummyLogReplayer dummyLogReplayer = new DummyLogReplayer();
performer.setReplayer(dummyLogReplayer);
// used to check that FileNode does recover
- DummyFileNodeRecoverPerformer fileNodeRecoverPerformer = new DummyFileNodeRecoverPerformer();
- performer.setFileNodeRecoverPerformer(fileNodeRecoverPerformer);
logNode.setRecoverPerformer(performer);
logNode.recover();
- assertTrue(!fileNodeRecoverPerformer.called);
- // the log diretory should be empty now
+ // the log directory should be empty now
File logDir = new File(logNode.getLogDirectory());
File[] files = logDir.listFiles();
assertTrue(files == null || files.length == 0);
} finally {
logNode.delete();
tempRestore.delete();
- tempProcessorStore.delete();
assertTrue(!tempRestoreRecovery.exists());
- assertTrue(!tempProcessorStoreRecovery.exists());
tempRestore.getParentFile().delete();
}
}
- class DummyFileNodeRecoverPerformer implements RecoverPerformer {
-
- public boolean called = false;
-
- @Override
- public void recover() {
- called = true;
- }
- }
-
class DummyLogReplayer implements LogReplayer {
- public List<PhysicalPlan> plansToCheck;
- public int currPos = 0;
+ List<PhysicalPlan> plansToCheck;
+ int currPos = 0;
@Override
public void replay(PhysicalPlan plan, boolean isOverflow) throws ProcessorException {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
index e03c418..a12d4b9 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
@@ -22,10 +22,8 @@ import static junit.framework.TestCase.assertTrue;
import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.MetadataArgsErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.RecoverException;
import org.apache.iotdb.db.metadata.MManager;
@@ -72,7 +70,7 @@ public class WriteLogNodeManagerTest {
WriteLogNodeManager manager = MultiFileLogNodeManager.getInstance();
WriteLogNode logNode = manager
- .getNode("root.managerTest", tempRestore.getPath(), tempProcessorStore.getPath());
+ .getNode("root.managerTest", tempRestore.getPath());
InsertPlan bwInsertPlan = new InsertPlan(1, "logTestDevice", 100,
new String[]{"s1", "s2", "s3", "s4"},
@@ -98,10 +96,9 @@ public class WriteLogNodeManagerTest {
}
@Test
- public void testRecoverAll() throws IOException, RecoverException, MetadataArgsErrorException {
+ public void testRecoverAll() throws IOException, RecoverException {
// this test create 5 log nodes and recover them
File tempRestore = File.createTempFile("managerTest", "restore");
- File tempProcessorStore = File.createTempFile("managerTest", "processorStore");
WriteLogNodeManager manager = MultiFileLogNodeManager.getInstance();
for (int i = 0; i < 5; i++) {
@@ -119,16 +116,16 @@ public class WriteLogNodeManagerTest {
} catch (PathErrorException ignored) {
}
WriteLogNode logNode = manager
- .getNode(deviceName, tempRestore.getPath(), tempProcessorStore.getPath());
+ .getNode(deviceName, tempRestore.getPath());
InsertPlan bwInsertPlan = new InsertPlan(1, deviceName, 100,
new String[]{"s1", "s2", "s3", "s4"},
new String[]{"1.0", "15", "str", "false"});
- UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path(deviceName + ".s1"));
+ // UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path(deviceName + ".s1"));
DeletePlan deletePlan = new DeletePlan(50, new Path(deviceName + ".s1"));
logNode.write(bwInsertPlan);
- logNode.write(updatePlan);
+ // logNode.write(updatePlan);
logNode.write(deletePlan);
logNode.forceSync();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index d125ebc..e92b94c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -24,7 +24,6 @@ import static junit.framework.TestCase.assertTrue;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.util.Arrays;
import java.util.zip.CRC32;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -64,15 +63,12 @@ public class WriteLogNodeTest {
public void testWriteLogAndSync() throws IOException {
// this test uses a dummy write log node to write a few logs and flushes them
// then reads the logs from file
- File tempRestore = new File("testtemp", "restore");
- File tempProcessorStore = new File("testtemp", "processorStore");
+ File tempRestore = new File("testTemp", "restore");
tempRestore.getParentFile().mkdirs();
tempRestore.createNewFile();
- tempProcessorStore.createNewFile();
CRC32 crc32 = new CRC32();
- WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", tempRestore.getPath(),
- tempProcessorStore.getPath());
+ WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", tempRestore.getPath());
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100,
new String[]{"s1", "s2", "s3", "s4"},
@@ -92,7 +88,7 @@ public class WriteLogNodeTest {
RandomAccessFile raf = new RandomAccessFile(walFile, "r");
byte[] buffer = new byte[10 * 1024 * 1024];
- int logSize = 0;
+ int logSize;
logSize = raf.readInt();
long checksum = raf.readLong();
raf.read(buffer, 0, logSize);
@@ -131,7 +127,6 @@ public class WriteLogNodeTest {
raf.close();
logNode.delete();
tempRestore.delete();
- tempProcessorStore.delete();
tempRestore.getParentFile().delete();
}
@@ -139,14 +134,11 @@ public class WriteLogNodeTest {
public void testNotifyFlush() throws IOException {
// this test writes a few logs and sync them
// then calls notifyStartFlush() and notifyEndFlush() to delete old file
- File tempRestore = new File("testtemp", "restore");
- File tempProcessorStore = new File("testtemp", "processorStore");
+ File tempRestore = new File("testTemp", "restore");
tempRestore.getParentFile().mkdirs();
tempRestore.createNewFile();
- tempProcessorStore.createNewFile();
- WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", tempRestore.getPath(),
- tempProcessorStore.getPath());
+ WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", tempRestore.getPath());
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100,
new String[]{"s1", "s2", "s3", "s4"},
@@ -176,7 +168,6 @@ public class WriteLogNodeTest {
logNode.delete();
tempRestore.delete();
- tempProcessorStore.delete();
tempRestore.getParentFile().delete();
}
@@ -185,14 +176,11 @@ public class WriteLogNodeTest {
// this test checks that if more logs than threshold are written, a sync will be triggered.
int flushWalThreshold = config.getFlushWalThreshold();
config.setFlushWalThreshold(3);
- File tempRestore = new File("testtemp", "restore");
- File tempProcessorStore = new File("testtemp", "processorStore");
+ File tempRestore = new File("testTemp", "restore");
tempRestore.getParentFile().mkdirs();
tempRestore.createNewFile();
- tempProcessorStore.createNewFile();
- WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", tempRestore.getPath(),
- tempProcessorStore.getPath());
+ WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", tempRestore.getPath());
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100,
new String[]{"s1", "s2", "s3", "s4"},
@@ -212,7 +200,6 @@ public class WriteLogNodeTest {
logNode.delete();
tempRestore.delete();
- tempProcessorStore.delete();
config.setFlushWalThreshold(flushWalThreshold);
tempRestore.getParentFile().delete();
}
@@ -221,14 +208,11 @@ public class WriteLogNodeTest {
public void testDelete() throws IOException {
// this test uses a dummy write log node to write a few logs and flushes them
// then deletes the node
- File tempRestore = new File("testtemp", "restore");
- File tempProcessorStore = new File("testtemp", "processorStore");
+ File tempRestore = new File("testTemp", "restore");
tempRestore.getParentFile().mkdirs();
tempRestore.createNewFile();
- tempProcessorStore.createNewFile();
- WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", tempRestore.getPath(),
- tempProcessorStore.getPath());
+ WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", tempRestore.getPath());
InsertPlan bwInsertPlan = new InsertPlan(1, "logTestDevice", 100,
new String[]{"s1", "s2", "s3", "s4"},
@@ -251,22 +235,18 @@ public class WriteLogNodeTest {
assertTrue(!new File(logNode.getLogDirectory()).exists());
tempRestore.delete();
- tempProcessorStore.delete();
tempRestore.getParentFile().delete();
}
@Test
public void testOverSizedWAL() throws IOException {
// this test uses a dummy write log node to write an over-sized log and assert exception caught
- File tempRestore = new File("testtemp", "restore");
- File tempProcessorStore = new File("testtemp", "processorStore");
+ File tempRestore = new File("testTemp", "restore");
tempRestore.getParentFile().mkdirs();
tempRestore.createNewFile();
- tempProcessorStore.createNewFile();
WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice.oversize",
- tempRestore.getPath(),
- tempProcessorStore.getPath());
+ tempRestore.getPath());
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice.oversize", 100,
new String[]{"s1", "s2", "s3", "s4"},
@@ -282,7 +262,6 @@ public class WriteLogNodeTest {
logNode.delete();
tempRestore.delete();
- tempProcessorStore.delete();
tempRestore.getParentFile().delete();
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
index 3dfa398..ed410ee 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
@@ -31,6 +31,9 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
*/
public class ChunkMetaData {
+ // TODO: remove hard coding
+ public static long estimatedObjectByteSize = 1500;
+
private String measurementUid;
/**
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
index 2cd0f58..9f2659e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
@@ -108,4 +108,6 @@ public abstract class FileSeriesReader {
private ChunkMetaData nextChunkMeta() {
return chunkMetaDataList.get(chunkToRead++);
}
+
+
}