You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/04/20 15:58:20 UTC
[incubator-iotdb] 01/01: revert to status without new BufferWriter
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch replace_list_by_array
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit d1cbd768d6fe243d308b430cc0405ef3b3962ed1
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sat Apr 20 23:57:56 2019 +0800
revert to status without new BufferWriter
---
iotdb/iotdb/conf/logback.xml | 2 -
.../db/engine/tsfiledata/TsFileProcessor.java | 823 ---------------------
.../db/query/control/QueryResourceManager.java | 23 -
.../EngineExecutorWithoutTimeGenerator.java | 58 --
.../iotdb/db/query/executor/EngineQueryRouter.java | 20 -
.../db/engine/tsfiledata/TsFileProcessorTest.java | 251 -------
.../integration/QueryDataFromUnclosedTsFileIT.java | 107 ---
7 files changed, 1284 deletions(-)
diff --git a/iotdb/iotdb/conf/logback.xml b/iotdb/iotdb/conf/logback.xml
index 1f8a915..2629723 100644
--- a/iotdb/iotdb/conf/logback.xml
+++ b/iotdb/iotdb/conf/logback.xml
@@ -111,7 +111,6 @@
</appender>
<logger level="info" name="org.apache.iotdb.db.service"/>
<logger level="info" name="org.apache.iotdb.db.conf"/>
-
<!-- a log appender that collect all log records whose level is greather than debug-->
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILEALL">
<file>${IOTDB_HOME}/logs/log_all.log</file>
@@ -130,7 +129,6 @@
<level>INFO</level>
</filter>
</appender>
-
<root level="info">
<appender-ref ref="FILEDEBUG"/>
<appender-ref ref="FILEWARN"/>
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
deleted file mode 100644
index 3e7bba4..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ /dev/null
@@ -1,823 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.tsfiledata;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.Processor;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
-import org.apache.iotdb.db.engine.bufferwrite.RestorableTsFileIOWriter;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
-import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
-import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
-import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.modification.Modification;
-import org.apache.iotdb.db.engine.pool.FlushManager;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
-import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
-import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.qp.constant.DatetimeUtils;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.utils.ImmediateFuture;
-import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.db.utils.QueryUtils;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Each storage group has a TsFileProcessor instance. Though there are many clients read/write data
- * by accessing TsFileProcessor, they need to get the readWriteReentrantLock of this processor
- * first. Therefore, as for different clients, the class looks like thread safe.
- * <br/> The class has two backend tasks:
- * (1) submit a flush job to flush data from memtable to disk async;
- * (2) close a tsfile and open a new one.
- * users also need to get the write lock to call these two tasks.
- */
-public class TsFileProcessor extends Processor {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(TsFileProcessor.class);
- public static final int WRITE_SUCCESS = 1;
- public static final int WRITE_IN_WARNING_MEM= 0;
- public static final int WRITE_REJECT_BY_TIME = -1;
- public static final int WRITE_REJECT_BY_MEM = -2;
-
- //this is just a part of fileSchemaRef: only the measurements that belong to this TsFileProcessor
- // are in this fileSchemaRef. And, this filed is shared with other classes (i.e., storage group
- // processor), so be careful if you modify it.
- private FileSchema fileSchemaRef;
-
-
- private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
- //when a flush task is in the backend, then it has not conflict with write operatons (because
- // write operation modifies wokMemtable, while flush task uses flushMemtable). However, the flush
- // task has concurrent problem with query operations, because the query needs to read data from
- // flushMemtable and the table may be clear if the flush operation ends. So, a Lock is needed.
- private ReentrantLock flushQueryLock = new ReentrantLock();
- private AtomicLong memSize = new AtomicLong();
-
- //fileNamePrefix (system time, rather than data time) time unit: nanosecond
- // this is used for generate the new TsFile name
- private long fileNamePrefix = System.nanoTime();
- //the times of calling insertion function (between two flush operations).
- private long valueCount = 0;
-
-
- private IMemTable workMemTable;
- private IMemTable flushMemTable;
- private RestorableTsFileIOWriter writer;
- private Action beforeFlushAction;
- private Action afterCloseAction;
- private Action afterFlushAction;
- private File insertFile;
- private TsFileResource currentResource;
-
- private List<TsFileResource> tsFileResources;
- private Map<String, List<TsFileResource>> inverseIndexOfResource;
-
- private Map<String, Long> minWrittenTimeForEachDeviceInCurrentFile;
- private Map<String, Long> maxWrittenTimeForEachDeviceInCurrentFile;
- private Map<String, Long> lastFlushedTimeForEachDevice;
-
- private WriteLogNode logNode;
- private VersionController versionController;
-
- /**
- * constructor of BufferWriteProcessor. data will be stored in baseDir/processorName/ folder.
- *
- * @param processorName processor name
- * @param fileSchemaRef file schema
- * @throws BufferWriteProcessorException BufferWriteProcessorException
- */
- @SuppressWarnings({"squid:S2259", "squid:S3776"})
- public TsFileProcessor(String processorName,
- Action beforeFlushAction, Action afterFlushAction, Action afterCloseAction,
- VersionController versionController,
- FileSchema fileSchemaRef) throws BufferWriteProcessorException, IOException {
- super(processorName);
- this.fileSchemaRef = fileSchemaRef;
- this.processorName = processorName;
-
- this.beforeFlushAction = beforeFlushAction;
- this.afterCloseAction = afterCloseAction;
- this.afterFlushAction = afterFlushAction;
- workMemTable = new PrimitiveMemTable();
- tsFileResources = new ArrayList<>();
- inverseIndexOfResource = new HashMap<>();
- lastFlushedTimeForEachDevice = new HashMap<>();
- minWrittenTimeForEachDeviceInCurrentFile = new HashMap<>();
- maxWrittenTimeForEachDeviceInCurrentFile = new HashMap<>();
- File unclosedFile = null;
- String unclosedFileName = null;
- int unclosedFileCount = 0;
- for (String folderPath : Directories.getInstance().getAllTsFileFolders()) {
- File dataFolder = new File(folderPath, processorName);
- if (dataFolder.exists()) {
- // we do not add the unclosed tsfile into tsFileResources.
- File[] unclosedFiles = dataFolder
- .listFiles(x -> x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX));
- unclosedFileCount += unclosedFiles.length;
- if (unclosedFileCount > 1) {
- break;
- } else if (unclosedFileCount == 1) {
- unclosedFileName = unclosedFiles[0].getName()
- .split(RestorableTsFileIOWriter.RESTORE_SUFFIX)[0];
- unclosedFile = new File(unclosedFiles[0].getParentFile(), unclosedFileName);
- }
- File[] datas = dataFolder
- .listFiles(x -> !x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX) && x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR).length == 2);
- Arrays.sort(datas, Comparator.comparingLong(x -> Long.parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
- for (File tsfile : datas) {
- //TODO we'd better define a file suffix for TsFile, e.g., .ts
- String[] names = tsfile.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR);
- long time = Long.parseLong(names[0]);
- if (fileNamePrefix < time) {
- fileNamePrefix = time;
- }
- if (unclosedFileCount == 0 || !tsfile.getName().equals(unclosedFileName)) {
- TsFileResource resource = new TsFileResource(tsfile, true);
- tsFileResources.add(resource);
- //maintain the inverse index and fileNamePrefix
- for (String device : resource.getDevices()) {
- inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>()).add(resource);
- lastFlushedTimeForEachDevice
- .merge(device, resource.getEndTime(device), (x, y) -> x > y ? x : y);
- }
- }
- }
- } else {
- //processor folder does not exist
- dataFolder.mkdirs();
- }
- }
- if (unclosedFileCount > 1) {
- throw new BufferWriteProcessorException(String
- .format("TsProcessor %s has more than one unclosed TsFile. please repair it",
- processorName));
- } else if (unclosedFileCount == 0){
- unclosedFile = generateNewTsFilePath();
- }
-
- initCurrentTsFile(unclosedFile);
-
- this.versionController = versionController;
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- try {
- logNode = MultiFileLogNodeManager.getInstance().getNode(
- processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX,
- writer.getRestoreFilePath(),
- FileNodeManager.getInstance().getRestoreFilePath(processorName));
- } catch (IOException e) {
- throw new BufferWriteProcessorException(e);
- }
- }
- }
-
-
- private File generateNewTsFilePath() throws BufferWriteProcessorException {
- String dataDir = Directories.getInstance().getNextFolderForTsfile();
- File dataFolder = new File(dataDir, processorName);
- if (!dataFolder.exists()) {
- if (!dataFolder.mkdirs()) {
- throw new BufferWriteProcessorException(
- String.format("Can not create TsFileProcess related folder: %s", dataFolder));
- }
- LOGGER.debug("The bufferwrite processor data dir doesn't exists, create new directory {}.",
- dataFolder.getAbsolutePath());
- }
- String fileName = (fileNamePrefix + 1) + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
- + System.currentTimeMillis();
- return new File(dataFolder, fileName);
- }
-
-
- private void initCurrentTsFile(File file) throws BufferWriteProcessorException {
- this.insertFile = file;
- try {
- writer = new RestorableTsFileIOWriter(processorName, insertFile.getAbsolutePath());
- this.currentResource = new TsFileResource(insertFile, writer);
- } catch (IOException e) {
- throw new BufferWriteProcessorException(e);
- }
-
- minWrittenTimeForEachDeviceInCurrentFile.clear();
- maxWrittenTimeForEachDeviceInCurrentFile.clear();
-
- }
-
- /**
- * wrete a ts record into the memtable. If the memory usage is beyond the memThreshold, an async
- * flushing operation will be called.
- *
- * @param plan data to be written
- * @return - 1 (WRITE_SUCCESS) if the tsRecord can be inserted into tsFile.
- * - 0 (WRITE_IN_WARNING_MEM) if the memory is UsageLevel.WARNING
- * - -1 (WRITE_REJECT_BY_TIME) if you need to insert it into overflow
- * - -2 (WRITE_REJECT_BY_MEM) if the memory is UsageLevel.ERROR
- * @throws BufferWriteProcessorException if a flushing operation occurs and failed.
- */
- public int insert(InsertPlan plan) throws BufferWriteProcessorException, IOException {
- if (lastFlushedTimeForEachDevice.containsKey(plan.getDeviceId()) && plan.getTime() <= lastFlushedTimeForEachDevice.get(plan.getDeviceId())) {
- return WRITE_REJECT_BY_TIME;
- }
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- logNode.write(plan);
- }
- long memUsage = 0;
- TSDataType type;
- String measurement;
- for (int i=0; i < plan.getMeasurements().length; i++){
- measurement = plan.getMeasurements()[i];
- type = fileSchemaRef.getMeasurementDataType(measurement);
- memUsage += MemUtils.getPointSize(type, measurement);
- }
- UsageLevel level = BasicMemController.getInstance().acquireUsage(this, memUsage);
- switch (level) {
- case SAFE:
- doInsert(plan);
- checkMemThreshold4Flush(memUsage);
- return WRITE_SUCCESS;
- case WARNING:
- if(LOGGER.isWarnEnabled()) {
- LOGGER.warn("Memory usage will exceed warning threshold, current : {}.",
- MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
- }
- doInsert(plan);
- try {
- flush();
- } catch (IOException e) {
- throw new BufferWriteProcessorException(e);
- }
- return WRITE_IN_WARNING_MEM;
- case DANGEROUS:
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.",
- MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
- }
- return WRITE_REJECT_BY_MEM;
- default:
- return WRITE_REJECT_BY_MEM;
- }
- }
-
- private void doInsert(InsertPlan plan) {
- String deviceId = plan.getDeviceId();
- long time = plan.getTime();
- TSDataType type;
- String measurement;
- for (int i=0; i < plan.getMeasurements().length; i++){
- measurement = plan.getMeasurements()[i];
- type = fileSchemaRef.getMeasurementDataType(measurement);
- workMemTable.write(deviceId, measurement, type, time, plan.getValues()[i]);
- }
- if (!minWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)) {
- minWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
- }
- if (!maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId) || maxWrittenTimeForEachDeviceInCurrentFile
- .get(deviceId) < time) {
- maxWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
- }
- valueCount++;
- }
-
- /**
- * Delete data whose timestamp <= 'timestamp' and belonging to timeseries deviceId.measurementId.
- * Delete data in both working MemTable and flushing MemTable.
- *
- * @param deviceId the deviceId of the timeseries to be deleted.
- * @param measurementId the measurementId of the timeseries to be deleted.
- * @param timestamp the upper-bound of deletion time.
- */
- public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
- workMemTable.delete(deviceId, measurementId, timestamp);
- if (maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId) && maxWrittenTimeForEachDeviceInCurrentFile
- .get(deviceId) < timestamp) {
- maxWrittenTimeForEachDeviceInCurrentFile
- .put(deviceId, lastFlushedTimeForEachDevice.getOrDefault(deviceId, 0L));
- }
- boolean deleteFlushTable = false;
- if (isFlush()) {
- // flushing MemTable cannot be directly modified since another thread is reading it
- flushMemTable = flushMemTable.copy();
- deleteFlushTable = flushMemTable.delete(deviceId, measurementId, timestamp);
- }
- String fullPath = deviceId +
- IoTDBConstant.PATH_SEPARATOR + measurementId;
- Deletion deletion = new Deletion(fullPath, versionController.nextVersion(), timestamp);
- if (deleteFlushTable || (currentResource.containsDevice(deviceId) && currentResource.getStartTime(deviceId) <= timestamp)) {
- currentResource.getModFile().write(deletion);
- }
- for (TsFileResource resource : tsFileResources) {
- if (resource.containsDevice(deviceId) && resource.getStartTime(deviceId) <= timestamp) {
- resource.getModFile().write(deletion);
- }
- }
- if (lastFlushedTimeForEachDevice.containsKey(deviceId) && lastFlushedTimeForEachDevice.get(deviceId) <= timestamp) {
- lastFlushedTimeForEachDevice.put(deviceId, 0L);
- }
- }
-
-
- private void checkMemThreshold4Flush(long addedMemory) throws BufferWriteProcessorException {
- long newMem = memSize.addAndGet(addedMemory);
- if (newMem > TSFileConfig.groupSizeInByte) {
- if (LOGGER.isInfoEnabled()) {
- String usageMem = MemUtils.bytesCntToStr(newMem);
- String threshold = MemUtils.bytesCntToStr(TSFileConfig.groupSizeInByte);
- LOGGER.info("The usage of memory {} in bufferwrite processor {} reaches the threshold {}",
- usageMem, processorName, threshold);
- }
- try {
- flush();
- } catch (IOException e) {
- LOGGER.error("Flush bufferwrite error.", e);
- throw new BufferWriteProcessorException(e);
- }
- }
- }
-
-
- /**
- * this method is for preparing a task task and then submitting it.
- * @return
- * @throws IOException
- */
- @Override
- public Future<Boolean> flush() throws IOException {
- // waiting for the end of last flush operation.
- try {
- flushFuture.get();
- } catch (InterruptedException | ExecutionException e) {
- LOGGER.error(
- "Encounter an interrupt error when waitting for the flushing, the TsFile Processor is {}.",
- getProcessorName(), e);
- Thread.currentThread().interrupt();
- }
- // statistic information for flush
- if (valueCount <= 0) {
- LOGGER.debug(
- "TsFile Processor {} has zero data to be flushed, will return directly.", processorName);
- flushFuture = new ImmediateFuture<>(true);
- return flushFuture;
- }
-
- if (LOGGER.isInfoEnabled()) {
- long thisFlushTime = System.currentTimeMillis();
- LOGGER.info(
- "The TsFile Processor {}: last flush time is {}, this flush time is {}, "
- + "flush time interval is {} s", getProcessorName(),
- DatetimeUtils.convertMillsecondToZonedDateTime(fileNamePrefix / 1000),
- DatetimeUtils.convertMillsecondToZonedDateTime(thisFlushTime),
- (thisFlushTime - fileNamePrefix / 1000) / 1000);
- }
- fileNamePrefix = System.nanoTime();
-
- // update the lastUpdatetime, prepare for flush
- try {
- beforeFlushAction.act();
- } catch (Exception e) {
- LOGGER.error("Failed to flush memtable into tsfile when calling the action function.");
- throw new IOException(e);
- }
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- logNode.notifyStartFlush();
- }
- valueCount = 0;
- switchWorkToFlush();
- long version = versionController.nextVersion();
- BasicMemController.getInstance().releaseUsage(this, memSize.get());
- memSize.set(0);
- // switch
- flushFuture = FlushManager.getInstance().submit(() -> flushTask(version));
- return flushFuture;
- }
-
- /**
- * this method will be concurrent with other methods..
- *
- * @param version the operation version that will tagged on the to be flushed memtable (i.e.,
- * ChunkGroup)
- * @return true if successfully.
- */
- private boolean flushTask(long version) {
- boolean result;
- long flushStartTime = System.currentTimeMillis();
- LOGGER.info("The TsFile Processor {} starts flushing.", processorName);
- try {
- if (flushMemTable != null && !flushMemTable.isEmpty()) {
- // flush data
- MemTableFlushUtil.flushMemTable(fileSchemaRef, writer, flushMemTable,
- version);
- // write restore information
- writer.flush();
- } else {
- //no need to flush.
- return true;
- }
-
- afterFlushAction.act();
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- logNode.notifyEndFlush(null);
- }
- result = true;
- } catch (Exception e) {
- LOGGER.error(
- "The TsFile Processor {} failed to flush, when calling the afterFlushAction(filenodeFlushAction).",
- processorName, e);
- result = false;
- } finally {
- try {
- switchFlushToWork();
- } catch (BufferWriteProcessorException e) {
- LOGGER.error(e.getMessage());
- result = false;
- }
- LOGGER.info("The TsFile Processor {} ends flushing.", processorName);
- }
- if (LOGGER.isInfoEnabled()) {
- long flushEndTime = System.currentTimeMillis();
- LOGGER.info(
- "The TsFile Processor {} flush, start time is {}, flush end time is {}, "
- + "flush time consumption is {}ms",
- processorName,
- DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime),
- DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime),
- flushEndTime - flushStartTime);
- }
- return result;
- }
-
- private void switchWorkToFlush() {
- flushQueryLock.lock();
- try {
- if (flushMemTable == null) {
- flushMemTable = workMemTable;
- for (String device : flushMemTable.getMemTableMap().keySet()) {
- lastFlushedTimeForEachDevice.put(device, maxWrittenTimeForEachDeviceInCurrentFile.get(device));
- }
- workMemTable = new PrimitiveMemTable();
- }
- } finally {
- flushQueryLock.unlock();
- }
- }
-
- private void switchFlushToWork() throws BufferWriteProcessorException {
- flushQueryLock.lock();
- try {
- //we update the index of currentTsResource.
- for (String device : flushMemTable.getMemTableMap().keySet()) {
- currentResource.setStartTime(device, minWrittenTimeForEachDeviceInCurrentFile.get(device));
- // new end time must be larger than the old one.
- currentResource.setEndTime(device, lastFlushedTimeForEachDevice.get(device));
- }
- flushMemTable.clear();
- flushMemTable = null;
- //make chunk groups in this flush task visble
- writer.appendMetadata();
- if (needCloseCurrentFile()) {
- closeCurrentTsFileAndOpenNewOne();
- }
- } finally {
- flushQueryLock.unlock();
- }
- }
-
- @Override
- public boolean canBeClosed() {
- return true;
- }
-
-
- /**
- * this method do not call flush() to flush data in memory to disk.
- * @throws BufferWriteProcessorException
- */
- private void closeCurrentTsFileAndOpenNewOne() throws BufferWriteProcessorException {
- closeCurrentFile();
- initCurrentTsFile(generateNewTsFilePath());
- }
-
- //very dangerous, how to make sure this function is thread safe (no other functions are running)
- private void closeCurrentFile() throws BufferWriteProcessorException {
- try {
- long closeStartTime = System.currentTimeMillis();
- // end file
- if (writer.getChunkGroupMetaDatas().isEmpty()){
- //this is an empty TsFile, we do not need to save it...
- writer.endFile(fileSchemaRef);
- Files.delete(Paths.get(insertFile.getAbsolutePath()));
- } else {
- writer.endFile(fileSchemaRef);
- // update the IntervalFile for interval list
- afterCloseAction.act();
- // flush the changed information for filenode
- afterFlushAction.act();
-
- tsFileResources.add(currentResource);
- //maintain the inverse index
- for (String device : currentResource.getDevices()) {
- inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>())
- .add(currentResource);
- }
- }
-
- // delete the restore for this bufferwrite processor
- if (LOGGER.isInfoEnabled()) {
- long closeEndTime = System.currentTimeMillis();
- LOGGER.info(
- "Close current TsFile {}, start time is {}, end time is {}, time consumption is {}ms",
- insertFile.getAbsolutePath(),
- DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
- DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
- closeEndTime - closeStartTime);
- }
-
- workMemTable.clear();
- if (flushMemTable != null) {
- flushMemTable.clear();
- }
- if (logNode != null) {
- logNode.close();
- }
-
- } catch (IOException e) {
- LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.",
- getProcessorName(), e);
- throw new BufferWriteProcessorException(e);
- } catch (Exception e) {
- LOGGER
- .error("Failed to close the bufferwrite processor when calling the action function.", e);
- throw new BufferWriteProcessorException(e);
- }
- }
-
- @Override
- public long memoryUsage() {
- return 0;
- }
-
- /**
- * check if is flushing.
- *
- * @return True if flushing
- */
- public boolean isFlush() {
- // starting a flush task has two steps: set the flushMemtable, and then set the flushFuture
- // So, the following case exists: flushMemtable != null but flushFuture is done (because the
- // flushFuture refers to the last finished flush.
- // And, the following case exists,too: flushMemtable == null, but flushFuture is not done.
- // (flushTask() is not finished, but switchToWork() has done)
- // So, checking flushMemTable is more meaningful than flushFuture.isDone().
- return flushMemTable != null;
- }
-
-
- /**
- * query data.
- */
- public GlobalSortedSeriesDataSource query(SingleSeriesExpression expression,
- QueryContext context) throws IOException {
- MeasurementSchema mSchema;
- TSDataType dataType;
-
- String deviceId = expression.getSeriesPath().getDevice();
- String measurementId = expression.getSeriesPath().getMeasurement();
-
- mSchema = fileSchemaRef.getMeasurementSchema(measurementId);
- dataType = mSchema.getType();
-
- // tsfile dataØØ
- List<TsFileResource> dataFiles = new ArrayList<>();
- for (TsFileResource tsfile : tsFileResources) {
- //TODO in the old version, tsfile is deep copied. I do not know why
- dataFiles.add(tsfile);
- }
- // bufferwrite data
- //TODO unsealedTsFile class is a little redundant.
- UnsealedTsFile unsealedTsFile = null;
-
- if (currentResource.getStartTime(deviceId) >= 0) {
- unsealedTsFile = new UnsealedTsFile();
- unsealedTsFile.setFilePath(currentResource.getFile().getAbsolutePath());
- List<ChunkMetaData> chunks = writer.getMetadatas(deviceId, measurementId, dataType);
- List<Modification> pathModifications = context.getPathModifications(
- currentResource.getModFile(), deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId);
- if (!pathModifications.isEmpty()) {
- QueryUtils.modifyChunkMetaData(chunks, pathModifications);
- }
-
- unsealedTsFile.setTimeSeriesChunkMetaDatas(chunks);
- }
- return new GlobalSortedSeriesDataSource(
- new Path(deviceId + "." + measurementId), dataFiles, unsealedTsFile,
- queryDataInMemtable(deviceId, measurementId, dataType, mSchema.getProps()));
- }
-
- /**
- * get the one (or two) chunk(s) in the memtable ( and the other one in flushing status and then
- * compact them into one TimeValuePairSorter). Then get its (or their) ChunkMetadata(s).
- *
- * @param deviceId device id
- * @param measurementId sensor id
- * @param dataType data type
- * @return corresponding chunk data in memory
- */
- private ReadOnlyMemChunk queryDataInMemtable(String deviceId,
- String measurementId, TSDataType dataType, Map<String, String> props) {
- flushQueryLock.lock();
- try {
- MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
- if (flushMemTable != null) {
- memSeriesLazyMerger.addMemSeries(flushMemTable.query(deviceId, measurementId, dataType, props));
- }
- memSeriesLazyMerger.addMemSeries(workMemTable.query(deviceId, measurementId, dataType, props));
- // memSeriesLazyMerger has handled the props,
- // so we do not need to handle it again in the following readOnlyMemChunk
- return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger, Collections.emptyMap());
- } finally {
- flushQueryLock.unlock();
- }
- }
-
-
- public String getInsertFilePath() {
- return insertFile.getAbsolutePath();
- }
-
- public WriteLogNode getLogNode() {
- return logNode;
- }
-
- /**
- * used for test. We can know when the flush() is called.
- *
- * @return the last flush() time. Time unit: nanosecond.
- */
- public long getFileNamePrefix() {
- return fileNamePrefix;
- }
-
- /**
- * used for test. We can block to wait for finishing flushing.
- *
- * @return the future of the flush() task.
- */
- public Future<Boolean> getFlushFuture() {
- return flushFuture;
- }
-
- @Override
- public void close() throws BufferWriteProcessorException {
- closeCurrentFile();
- try {
- if (currentResource != null) {
- currentResource.close();
- }
- for (TsFileResource resource : tsFileResources) {
- resource.close();
- }
-
- } catch (IOException e) {
- throw new BufferWriteProcessorException(e);
- }
- }
-
- /**
- * remove all data of this processor. Used For UT
- */
- public void removeMe() throws BufferWriteProcessorException, IOException {
- try {
- flushFuture.get(10000, TimeUnit.MILLISECONDS);
- } catch (ExecutionException | TimeoutException e) {
- LOGGER.error("can not end running flush task in 10 seconds: {}", e.getMessage());
- } catch (InterruptedException e) {
- LOGGER.error("current running flush task is interrupted.", e);
- Thread.currentThread().interrupt();
- }
- close();
- for (String folder : Directories.getInstance().getAllTsFileFolders()) {
- File dataFolder = new File(folder, processorName);
- if (dataFolder.exists()) {
- for (File file: dataFolder.listFiles()) {
- Files.deleteIfExists(Paths.get(file.getAbsolutePath()));
- }
- }
- }
- }
-
- /**
- * Check if the currentTsFileResource is toooo large.
- * @return true if the file is too large.
- */
- private boolean needCloseCurrentFile() {
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- long fileSize = currentResource.getFile().length();
- if (fileSize >= config.getBufferwriteFileSizeThreshold()) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info(
- "The bufferwrite processor {}, size({}) of the file {} reaches threshold {}.",
- processorName, MemUtils.bytesCntToStr(fileSize), currentResource.getFilePath(),
- MemUtils.bytesCntToStr(config.getBufferwriteFileSizeThreshold()));
- }
- return true;
- }
- return false;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof TsFileProcessor)) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- TsFileProcessor that = (TsFileProcessor) o;
- return fileNamePrefix == that.fileNamePrefix &&
- valueCount == that.valueCount &&
- Objects.equals(fileSchemaRef, that.fileSchemaRef) &&
- Objects.equals(flushFuture, that.flushFuture) &&
- Objects.equals(flushQueryLock, that.flushQueryLock) &&
- Objects.equals(memSize, that.memSize) &&
- Objects.equals(workMemTable, that.workMemTable) &&
- Objects.equals(flushMemTable, that.flushMemTable) &&
- Objects.equals(writer, that.writer) &&
- Objects.equals(beforeFlushAction, that.beforeFlushAction) &&
- Objects.equals(afterCloseAction, that.afterCloseAction) &&
- Objects.equals(afterFlushAction, that.afterFlushAction) &&
- Objects.equals(insertFile, that.insertFile) &&
- Objects.equals(currentResource, that.currentResource) &&
- Objects.equals(tsFileResources, that.tsFileResources) &&
- Objects.equals(inverseIndexOfResource, that.inverseIndexOfResource) &&
- Objects.equals(lastFlushedTimeForEachDevice, that.lastFlushedTimeForEachDevice) &&
- Objects.equals(logNode, that.logNode) &&
- Objects.equals(versionController, that.versionController);
- }
-
- @Override
- public int hashCode() {
- return Objects
- .hash(super.hashCode(), fileSchemaRef, flushFuture, flushQueryLock, memSize, fileNamePrefix,
- valueCount, workMemTable, flushMemTable, writer, beforeFlushAction, afterCloseAction,
- afterFlushAction, insertFile, currentResource, tsFileResources, inverseIndexOfResource,
- lastFlushedTimeForEachDevice, logNode, versionController);
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index d39fa39..e4ef67e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -197,26 +196,4 @@ public class QueryResourceManager {
private QueryTokenManagerHelper() {
}
}
-
-
- /**
- * TODO
- * This is only for test TsFileProcessor now. This method will finally be replaced when
- * fileNodeManager is refactored
- */
- public QueryDataSource getQueryDataSourceByTsFileProcessor(Path selectedPath,
- QueryContext context, TsFileProcessor processor)
- throws IOException, FileNodeManagerException {
- OverflowSeriesDataSource overflowSeriesDataSource = new OverflowSeriesDataSource(selectedPath);
- overflowSeriesDataSource.setOverflowInsertFileList(Collections.EMPTY_LIST);
-
- SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(selectedPath, null);
- GlobalSortedSeriesDataSource dataSource =processor.query(singleSeriesExpression, context);
- QueryDataSource queryDataSource = new QueryDataSource(dataSource, overflowSeriesDataSource);
- // add used files to current thread request cached map
- filePathsManager.addUsedFilesForGivenJob(context.getJobId(), queryDataSource);
-
- return queryDataSource;
-
- }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
index d7829ba..07073cd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
@@ -162,61 +161,4 @@ public class EngineExecutorWithoutTimeGenerator {
throw new FileNodeManagerException(e);
}
}
-
- /**
- * TODO
- * This is only for test TsFileProcessor now. This method will finally be replaced when
- * fileNodeManager is refactored
- */
- public QueryDataSet executeWithoutFilter(QueryContext context, TsFileProcessor processor)
- throws FileNodeManagerException, IOException {
-
- List<IPointReader> readersOfSelectedSeries = new ArrayList<>();
- List<TSDataType> dataTypes = new ArrayList<>();
-
- QueryResourceManager.getInstance()
- .beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries());
-
- for (Path path : queryExpression.getSelectedSeries()) {
-
- QueryDataSource queryDataSource = QueryResourceManager.getInstance()
- .getQueryDataSourceByTsFileProcessor(path, context, processor);
-
- // add data type
- try {
- dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
- } catch (PathErrorException e) {
- throw new FileNodeManagerException(e);
- }
-
- // sequence insert data
- SequenceDataReader tsFilesReader;
- try {
- tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- null, context);
- } catch (IOException e) {
- throw new FileNodeManagerException(e);
- }
-
- // unseq insert data
- PriorityMergeReader unSeqMergeReader;
- try {
- unSeqMergeReader = SeriesReaderFactory.getInstance()
- .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
- } catch (IOException e) {
- throw new FileNodeManagerException(e);
- }
-
- // merge sequence data with unsequence data.
- readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
- }
-
- try {
- return new EngineDataSetWithoutTimeGenerator(queryExpression.getSelectedSeries(), dataTypes,
- readersOfSelectedSeries);
- } catch (IOException e) {
- throw new FileNodeManagerException(e);
- }
- }
-
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index 9143026..96ad5ad 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
@@ -31,7 +30,6 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.groupby.GroupByWithOnlyTimeFilterDataSetDataSet;
import org.apache.iotdb.db.query.executor.groupby.GroupByWithValueFilterDataSetDataSet;
import org.apache.iotdb.db.query.fill.IFill;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -110,24 +108,6 @@ public class EngineQueryRouter {
}
/**
- * TODO
- * This is only for test TsFileProcessor now. This method will finally be replaced when
- * fileNodeManager is refactored
- */
- public QueryDataSet query(QueryExpression queryExpression, TsFileProcessor processor,
- QueryContext context)
- throws FileNodeManagerException, IOException {
-
- if (queryExpression.hasQueryFilter()) {
- throw new NotImplementedException("this function is just for test...");
- } else {
- EngineExecutorWithoutTimeGenerator engineExecutor = new EngineExecutorWithoutTimeGenerator(
- queryExpression);
- return engineExecutor.executeWithoutFilter(context, processor);
- }
- }
-
- /**
* execute groupBy query.
*
* @param selectedSeries select path list
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
deleted file mode 100644
index 22d815c..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.tsfiledata;
-
-import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
-import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.exception.FileNodeProcessorException;
-import org.apache.iotdb.db.exception.MetadataArgsErrorException;
-import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.executor.EngineQueryRouter;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.utils.ImmediateFuture;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TsFileProcessorTest {
- private static Logger LOGGER = LoggerFactory.getLogger(TsFileProcessorTest.class);
- TsFileProcessor processor;
- MManager mManager;
- EngineQueryRouter queryManager;
- Action doNothingAction = new Action() {
- @Override
- public void act() throws ActionException {
- }
- };
- Map<String, MeasurementSchema> measurementSchemaMap = new HashMap<>();
-
- FileSchema schema;
-
- long oldBufferwriteFileSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold();
- @Before
- public void setUp() throws Exception {
- EnvironmentUtils.cleanEnv();
- TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
- TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
-// now we do not support wal because it need to modify the wal module.
-// IoTDBDescriptor.getInstance().getConfig().setEnableWal(true);
- IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(2*1024*1024);
- mManager = MManager.getInstance();
- queryManager = new EngineQueryRouter();
- measurementSchemaMap.put("s1", new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
- measurementSchemaMap.put("s2", new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
- measurementSchemaMap.put("s3", new MeasurementSchema("s3", TSDataType.FLOAT, TSEncoding.RLE));
- schema = new FileSchema(measurementSchemaMap);
- processor = new TsFileProcessor("root.test", doNothingAction, doNothingAction, doNothingAction,
- SysTimeVersionController.INSTANCE, schema);
- mManager.setStorageLevelToMTree("root.test");
- mManager.addPathToMTree("root.test.d1.s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
- mManager.addPathToMTree("root.test.d2.s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
- mManager.addPathToMTree("root.test.d1.s2", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
- mManager.addPathToMTree("root.test.d2.s2", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
- }
-
-
- @After
- public void tearDown() throws Exception {
- //processor.close();
- // processor.writeLock();
- processor.removeMe();
- EnvironmentUtils.cleanEnv();
- IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
- IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(oldBufferwriteFileSizeThreshold);
- }
-
- @Test
- public void insert()
- throws BufferWriteProcessorException, IOException, ExecutionException, InterruptedException, FileNodeProcessorException, FileNodeManagerException, PathErrorException, MetadataArgsErrorException {
- String[] s1 = new String[]{"s1"};
- String[] s2 = new String[]{"s2"};
- String[] value = new String[]{"5.0"};
- ;
- Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 10, s1, value)));
- Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 10, s2, value)));
- Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
- Future<Boolean> ok = processor.flush();
- ok.get();
- ok = processor.flush();
- Assert.assertTrue(ok instanceof ImmediateFuture);
- ok.get();
- ok = processor.flush();
- Assert.assertTrue(ok instanceof ImmediateFuture);
- ok.get();
-
- //let's rewrite timestamp =12 again..
- Assert.assertEquals(TsFileProcessor.WRITE_REJECT_BY_TIME, processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
- processor.delete("root.test.d1", "s1",12);
- Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 12, s1, value)));
- Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 13, s1, value)));
- Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d2", 10, s1, value)));
- Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1", 14, s1, value)));
- processor.delete("root.test.d1", "s1",12);
- processor.delete("root.test.d3", "s1",12);
-
-
- QueryExpression qe = QueryExpression.create(Collections.singletonList(new Path("root.test.d1", "s1")), null);
- QueryDataSet result = queryManager.query(qe, processor, TEST_QUERY_CONTEXT);
- while (result.hasNext()) {
- RowRecord record = result.next();
- System.out.println(record.getTimestamp() +"," + record.getFields().get(0).getFloatV());
- }
- }
-
-
-
- @Test
- public void bruteForceTest() throws InterruptedException, FileNodeManagerException, IOException {
-
- String[] devices = new String[] {"root.test.d1", "root.test.d2"};
- String[] sensors = new String[] {"s1", "s2"};
- final boolean[] exception = {false, false, false};
- final boolean[] goon = {true};
- int totalsize = 50000;
-
- final int[] count = {0};
- QueryExpression qe = QueryExpression.create(Collections.singletonList(new Path("root.test.d1", "s1")), null);
- Thread insertThread = new Thread() {
- @Override
- public void run() {
- int i =0;
- long time = 100L;
- long start = System.currentTimeMillis();
- String[] sensors = new String[]{"s1"};
- String[] values = new String[1];
- try {
- for (int j = 0; j < totalsize && goon[0]; j++) {
- processor.lock(true);
-// processor.insert("root.test.d1","s1", time++, String.valueOf(j));
-// processor.insert("root.test.d2","s1", time++, String.valueOf(j));
- values[0] = String.valueOf(j);
- processor.insert(new InsertPlan("root.test.d1", time++, sensors, values));
- processor.insert(new InsertPlan("root.test.d2", time++, sensors, values));
- processor.writeUnlock();
- count[0]++;
- }
- System.out.println((System.currentTimeMillis() - start));
- } catch (BufferWriteProcessorException | IOException e) {
- // we will break out.
- LOGGER.error(e.getMessage());
- exception[0] = true;
- }
- }
- };
- Thread flushThread = new Thread() {
- @Override
- public void run() {
- try {
- for (int j = 0; j < totalsize * 2 && goon[0]; j++) {
- processor.lock(true);
- processor.flush();
- processor.writeUnlock();
- }
- } catch (IOException e) {
- // we will break out.
- LOGGER.error(e.getMessage());
- exception[1] = true;
- }
- }
- };
- //we temporary disable the query because there are bugs..
- Thread queryThread = new Thread() {
- @Override
- public void run() {
- try {
- for (int j = 0; j < totalsize * 2 && goon[0]; j++) {
- processor.lock(false);
- QueryContext context = new QueryContext(QueryResourceManager.getInstance().assignJobId());
- QueryDataSet result = queryManager.query(qe, processor, context);
- while (result.hasNext()) {
- result.next();
- }
- QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
- processor.readUnlock();
- }
- } catch (IOException | FileNodeManagerException e) {
- // we will break out.
- LOGGER.error(e.getMessage());
- exception[2] = true;
- }
- }
- };
- flushThread.start();
- insertThread.start();
- queryThread.start();
- //wait at most 20 seconds.
- insertThread.join(20000);
- goon[0] = false;
- //queryThread.join(5000);
- Assert.assertFalse(exception[0]);
- Assert.assertFalse(exception[1]);
- Assert.assertFalse(exception[2]);
- synchronized (this) {
- while (queryThread.isAlive()) {
- this.wait(50);
- }
- }
- QueryDataSet result = queryManager.query(qe, processor, TEST_QUERY_CONTEXT);
- int size =0;
- while (result.hasNext()) {
- RowRecord record = result.next();
- size ++;
- }
- //Assert.assertEquals(count[0], size);
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/QueryDataFromUnclosedTsFileIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/QueryDataFromUnclosedTsFileIT.java
deleted file mode 100644
index ceaebb0..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/QueryDataFromUnclosedTsFileIT.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.integration;
-
-
-import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
-import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID;
-
-import java.io.IOException;
-import java.util.Collections;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
-import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.exception.MetadataArgsErrorException;
-import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.executor.EngineQueryRouter;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class QueryDataFromUnclosedTsFileIT {
-
- long bufferWriteFileSize;
- FileNodeManager sgManager;
- MManager mManager;
- EngineQueryRouter queryManager;
- @Before
- public void setUp() throws IOException, FileNodeManagerException {
- EnvironmentUtils.cleanEnv();
- TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
- TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
- bufferWriteFileSize = IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold();
- //IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(100);
- sgManager = FileNodeManager.getInstance();
- mManager = MManager.getInstance();
- queryManager = new EngineQueryRouter();
- }
-
- @After
- public void tearDown() throws FileNodeManagerException, IOException {
- IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(bufferWriteFileSize);;
- //sgManager.deleteAll();
- //mManager.clear();
- EnvironmentUtils.cleanEnv();
-
- }
-
- @Test
- public void test()
- throws FileNodeManagerException, IOException, PathErrorException, MetadataArgsErrorException {
- //Path path, TSDataType dataType, TSEncoding encoding, CompressionType compressor,
- // Map<String, String> props
- mManager.setStorageLevelToMTree("root.test");
- mManager.addPathToMTree("root.test.d1.s1", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
- mManager.addPathToMTree("root.test.d2.s1", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
- sgManager.addTimeSeries(new Path("root.test.d1", "s1"), TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY, Collections
- .emptyMap());
- sgManager.addTimeSeries(new Path("root.test.d2", "s1"), TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY, Collections
- .emptyMap());
- long time = System.currentTimeMillis();
- for (int i=0; i < 20000; i++) {
- sgManager.insert(new TSRecord(i, "root.test.d1").addTuple(new IntDataPoint("s1", i)), false);
- sgManager.insert(new TSRecord(i, "root.test.d2").addTuple(new IntDataPoint("s1", i)), false);
- }
- //for (int i=0; i< 2; i++) {
- QueryExpression qe = QueryExpression
- .create(Collections.singletonList(new Path("root.test.d1", "s1")), null);
- QueryDataSet result = queryManager.query(qe, TEST_QUERY_CONTEXT);
- while (result.hasNext()) {
- RowRecord record = result.next();
- //System.out.println(record.getTimestamp() + "," + record.getFields().get(0).getIntV());
- }
- //}
-
- }
-
-}