You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/05/29 00:58:21 UTC
[incubator-iotdb] 04/05: migrate usages of FileNodeProcessor
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch refactor_overflow
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 02647ac42889806f75a674659437d59fcc0eeefe
Author: 江天 <jt...@163.com>
AuthorDate: Tue May 28 13:40:47 2019 +0800
migrate usages of FileNodeProcessor
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../org/apache/iotdb/db/engine/DatabaseEngine.java | 5 +-
...ileNodeConstants.java => EngingeConstants.java} | 16 +-
.../java/org/apache/iotdb/db/engine/Processor.java | 1 -
.../apache/iotdb/db/engine/bufferwrite/Action.java | 28 -
.../db/engine/bufferwrite/ActionException.java | 30 -
.../engine/bufferwrite/BufferWriteProcessor.java | 560 ------
.../db/engine/filenode/FileNodeFlushFuture.java | 91 -
.../iotdb/db/engine/filenode/FileNodeManager.java | 1217 ------------
.../db/engine/filenode/FileNodeProcessor.java | 2065 --------------------
.../engine/filenode/FileNodeProcessorStatus.java | 51 -
.../db/engine/filenode/FileNodeProcessorStore.java | 165 --
.../db/engine/filenode/OverflowChangeType.java | 57 -
.../db/engine/memcontrol/FlushPartialPolicy.java | 3 +-
.../db/engine/memcontrol/ForceFLushAllPolicy.java | 4 +-
.../db/engine/overflow/io/OverflowProcessor.java | 6 +-
.../db/engine/querycontext/SeriesDataSource.java | 2 +-
.../db/engine/sgmanager/StorageGroupManager.java | 7 +-
.../db/engine/sgmanager/StorageGroupProcessor.java | 7 +-
.../{filenode => sgmanager}/TsFileResource.java | 8 +-
.../RestorableTsFileIOWriter.java | 2 +-
.../db/engine/tsfiledata/TsFileProcessor.java | 14 +-
.../org/apache/iotdb/db/monitor/IStatistic.java | 4 +-
.../org/apache/iotdb/db/monitor/StatMonitor.java | 79 +-
.../iotdb/db/monitor/collector/FileSize.java | 16 +-
.../iotdb/db/qp/executor/OverflowQPExecutor.java | 61 +-
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 21 +-
.../iotdb/db/query/control/JobFileManager.java | 2 +-
.../db/query/control/QueryResourceManager.java | 60 +-
.../EngineExecutorWithoutTimeGenerator.java | 61 +-
.../iotdb/db/query/executor/EngineQueryRouter.java | 17 -
.../db/query/factory/SeriesReaderFactory.java | 16 +-
.../query/reader/sequence/SealedTsFilesReader.java | 2 +-
.../sequence/SealedTsFilesReaderByTimestamp.java | 2 +-
.../apache/iotdb/db/service/CloseMergeService.java | 5 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 7 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 10 +-
.../java/org/apache/iotdb/db/sync/SyncUtils.java | 2 +-
.../iotdb/db/sync/receiver/SyncServiceImpl.java | 30 +-
.../org/apache/iotdb/db/utils/LoadDataUtils.java | 11 +-
.../java/org/apache/iotdb/db/utils/QueryUtils.java | 2 +-
.../recover/ExclusiveLogRecoverPerformer.java | 3 +-
.../writelog/recover/FileNodeRecoverPerformer.java | 4 +-
.../db/writelog/replay/ConcreteLogReplayer.java | 30 +-
.../engine/bufferwrite/BufferWriteBenchmark.java | 13 +-
.../bufferwrite/BufferWriteProcessorNewTest.java | 7 +-
.../bufferwrite/BufferWriteProcessorTest.java | 7 +-
.../bufferwrite/RestorableTsFileIOWriterTest.java | 1 +
.../filenodev2/FileNodeManagerBenchmark.java | 6 +-
.../memcontrol/BufferwriteFileSizeControlTest.java | 8 +-
.../memcontrol/BufferwriteMetaSizeControlTest.java | 8 +-
.../memcontrol/OverflowFileSizeControlTest.java | 10 +-
.../memcontrol/OverflowMetaSizeControlTest.java | 10 +-
.../engine/modification/DeletionFileNodeTest.java | 52 +-
.../db/engine/modification/DeletionQueryTest.java | 88 +-
.../overflow/io/OverflowProcessorBenchmark.java | 10 +-
.../engine/overflow/io/OverflowProcessorTest.java | 6 +-
.../integration/QueryDataFromUnclosedTsFileIT.java | 6 +-
.../org/apache/iotdb/db/monitor/MonitorTest.java | 20 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 10 +-
60 files changed, 303 insertions(+), 4745 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6509812..cb4b799 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -183,7 +183,7 @@ public class IoTDBConfig {
private long memThresholdDangerous = (long) (0.6 * Runtime.getRuntime().maxMemory());
/**
* MemMonitorThread will check every such interval(in ms). If memThresholdWarning is reached,
- * MemMonitorThread will inform FileNodeManager to flush.
+ * MemMonitorThread will inform DataBaseEngine to flush.
*/
private long memMonitorInterval = 1000;
/**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/DatabaseEngine.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/DatabaseEngine.java
index 7dc436e..3e4a979 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/DatabaseEngine.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/DatabaseEngine.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine;
import java.util.List;
import java.util.Map;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.sgmanager.TsFileResource;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
/**
* DatabaseEngine is an abstraction of IoTDB storage-level interfaces.
@@ -136,7 +137,7 @@ public interface DatabaseEngine {
/**
* delete one storage group.
*/
- void deleteOneStorageGroup(String processorName) throws StorageGroupManagerException;
+ void deleteStorageGroup(String processorName) throws StorageGroupManagerException;
/**
* add time series.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/FileNodeConstants.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/EngingeConstants.java
similarity index 68%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/FileNodeConstants.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/EngingeConstants.java
index 483f7f1..56cd481 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/FileNodeConstants.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/EngingeConstants.java
@@ -16,29 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.engine.bufferwrite;
+package org.apache.iotdb.db.engine;
/**
* Constants for using in bufferwrite, overflow and filenode.
*
*/
-public class FileNodeConstants {
+public class EngingeConstants {
- private FileNodeConstants(){
+ private EngingeConstants(){
}
- public static final String FILE_NODE_OPERATOR_TYPE = "OPERATOR_TYPE";
- public static final String TIMESTAMP_KEY = "TIMESTAMP";
- public static final String FILE_NODE = "FILE_NODE";
- public static final String CLOSE_ACTION = "CLOSE_ACTION";
-
public static final String OVERFLOW_FLUSH_ACTION = "OVERFLOW_FLUSH_ACTION";
public static final String BUFFERWRITE_FLUSH_ACTION = "BUFFERWRITE_FLUSH_ACTION";
public static final String BUFFERWRITE_CLOSE_ACTION = "BUFFERWRITE_CLOSE_ACTION";
public static final String FILENODE_PROCESSOR_FLUSH_ACTION = "FILENODE_PROCESSOR_FLUSH_ACTION";
- public static final String MREGE_EXTENSION = "merge";
- public static final String ERR_EXTENSION = "err";
- public static final String PATH_SEPARATOR = ".";
- public static final String BUFFERWRITE_FILE_SEPARATOR = "-";
+ public static final String TSFILE_NAME_SEPARATOR = "-";
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
index 223523a..363c685 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
@@ -23,7 +23,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessor;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/Action.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/Action.java
deleted file mode 100644
index af65a8a..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/Action.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.bufferwrite;
-
-/**
- * Action interface.
- */
-
-public interface Action {
-
- void act() throws ActionException;
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/ActionException.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/ActionException.java
deleted file mode 100644
index 1d76d9f..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/ActionException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.bufferwrite;
-
-public class ActionException extends Exception{
-
- public ActionException(String message) {
- super(message);
- }
-
- public ActionException(Throwable cause) {
- super(cause);
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
deleted file mode 100644
index b99d0a2..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ /dev/null
@@ -1,560 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.bufferwrite;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.Processor;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
-import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
-import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
-import org.apache.iotdb.db.engine.pool.FlushManager;
-import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.exception.TsFileProcessorException;
-import org.apache.iotdb.db.qp.constant.DatetimeUtils;
-import org.apache.iotdb.db.utils.ImmediateFuture;
-import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BufferWriteProcessor extends Processor {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessor.class);
- private RestorableTsFileIOWriter writer;
- private FileSchema fileSchema;
- private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
- private ReentrantLock flushQueryLock = new ReentrantLock();
- private AtomicLong memSize = new AtomicLong();
- private long memThreshold = TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
- private IMemTable workMemTable;
- private IMemTable flushMemTable;
- private Action bufferwriteFlushAction;
- private Action bufferwriteCloseAction;
- private Action filenodeFlushAction;
-
- //lastFlushTime time unit: nanosecond
- private long lastFlushTime = -1;
- private long valueCount = 0;
-
- private String baseDir;
- private String insertFilePath;
- private String bufferWriteRelativePath;
-
- private WriteLogNode logNode;
- private VersionController versionController;
-
- private boolean isClosed = true;
- private boolean isFlush = false;
-
- /**
- * constructor of BufferWriteProcessor.
- *
- * @param baseDir base dir
- * @param processorName processor name
- * @param fileName file name
- * @param parameters parameters in Map(String, Action) structure
- * @param fileSchema file schema
- * @throws BufferWriteProcessorException BufferWriteProcessorException
- */
- public BufferWriteProcessor(String baseDir, String processorName, String fileName,
- Map<String, Action> parameters, VersionController versionController,
- FileSchema fileSchema) throws BufferWriteProcessorException {
- super(processorName);
- this.fileSchema = fileSchema;
- this.baseDir = baseDir;
-
- bufferwriteFlushAction = parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
- bufferwriteCloseAction = parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
- filenodeFlushAction = parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
-
- reopen(fileName);
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- try {
- logNode = MultiFileLogNodeManager.getInstance().getNode(
- processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX,
- getBufferwriteRestoreFilePath(),
- FileNodeManager.getInstance().getRestoreFilePath(processorName));
- } catch (IOException e) {
- throw new BufferWriteProcessorException(e);
- }
- }
- this.versionController = versionController;
-
- }
-
- public void reopen(String fileName) throws BufferWriteProcessorException {
- if (!isClosed) {
- return;
- }
- new File(baseDir, processorName).mkdirs();
- this.insertFilePath = Paths.get(baseDir, processorName, fileName).toString();
- bufferWriteRelativePath = processorName + File.separatorChar + fileName;
- try {
- writer = new RestorableTsFileIOWriter(processorName, insertFilePath);
- } catch (IOException e) {
- throw new BufferWriteProcessorException(e);
- }
- if (workMemTable == null) {
- workMemTable = new PrimitiveMemTable();
- } else {
- workMemTable.clear();
- }
- isClosed = false;
- isFlush = false;
- }
-
- public void checkOpen() throws BufferWriteProcessorException {
- if (isClosed) {
- throw new BufferWriteProcessorException("BufferWriteProcessor already closed");
- }
- }
-
-
- /**
- * write one data point to the buffer write.
- *
- * @param deviceId device name
- * @param measurementId sensor name
- * @param timestamp timestamp of the data point
- * @param dataType the data type of the value
- * @param value data point value
- * @return true -the size of tsfile or metadata reaches to the threshold. false -otherwise
- * @throws BufferWriteProcessorException if a flushing operation occurs and failed.
- */
- public boolean write(String deviceId, String measurementId, long timestamp, TSDataType dataType,
- String value)
- throws BufferWriteProcessorException {
- checkOpen();
- TSRecord record = new TSRecord(timestamp, deviceId);
- DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, value);
- record.addTuple(dataPoint);
- return write(record);
- }
-
- /**
- * wrete a ts record into the memtable. If the memory usage is beyond the memThreshold, an async
- * flushing operation will be called.
- *
- * @param tsRecord data to be written
- * @return FIXME what is the mean about the return value??
- * @throws BufferWriteProcessorException if a flushing operation occurs and failed.
- */
- public boolean write(TSRecord tsRecord) throws BufferWriteProcessorException {
- checkOpen();
- long memUsage = MemUtils.getRecordSize(tsRecord);
- BasicMemController.UsageLevel level = BasicMemController.getInstance()
- .acquireUsage(this, memUsage);
-
- String memory;
- switch (level) {
- case SAFE:
- for (DataPoint dataPoint : tsRecord.dataPointList) {
- workMemTable.write(tsRecord.deviceId, dataPoint.getMeasurementId(), dataPoint.getType(),
- tsRecord.time,
- dataPoint.getValue().toString());
- }
- valueCount++;
- checkMemThreshold4Flush(memUsage);
- return true;
- case WARNING:
- memory = MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage());
- LOGGER.warn("Memory usage will exceed warning threshold, current : {}.", memory);
- for (DataPoint dataPoint : tsRecord.dataPointList) {
- workMemTable.write(tsRecord.deviceId, dataPoint.getMeasurementId(), dataPoint.getType(),
- tsRecord.time,
- dataPoint.getValue().toString());
- }
- valueCount++;
- try {
- flush();
- } catch (IOException e) {
- throw new BufferWriteProcessorException(e);
- }
- return true;
- case DANGEROUS:
- default:
- memory = MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage());
- LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.", memory);
- return false;
- }
- }
-
- private void checkMemThreshold4Flush(long addedMemory) throws BufferWriteProcessorException {
- long newMem = memSize.addAndGet(addedMemory);
- if (newMem > memThreshold) {
- String usageMem = MemUtils.bytesCntToStr(newMem);
- String threshold = MemUtils.bytesCntToStr(memThreshold);
- String processorName = getProcessorName();
- LOGGER.info("The usage of memory {} in bufferwrite processor {} reaches the threshold {}",
- usageMem, processorName, threshold);
- try {
- flush();
- } catch (IOException e) {
- LOGGER.error("Flush bufferwrite error.", e);
- throw new BufferWriteProcessorException(e);
- }
- }
- }
-
- /**
- * get the one (or two) chunk(s) in the memtable ( and the other one in flushing status and then
- * compact them into one TimeValuePairSorter). Then get its (or their) ChunkMetadata(s).
- *
- * @param deviceId device id
- * @param measurementId sensor id
- * @param dataType data type
- * @return corresponding chunk data and chunk metadata in memory
- */
- public Pair<ReadOnlyMemChunk, List<ChunkMetaData>> queryBufferWriteData(String deviceId,
- String measurementId, TSDataType dataType, Map<String, String> props)
- throws BufferWriteProcessorException {
- checkOpen();
- flushQueryLock.lock();
- try {
- MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
- if (flushMemTable != null) {
- memSeriesLazyMerger.addMemSeries(flushMemTable.query(deviceId, measurementId, dataType, props));
- }
- memSeriesLazyMerger.addMemSeries(workMemTable.query(deviceId, measurementId, dataType, props));
- // memSeriesLazyMerger has handled the props,
- // so we do not need to handle it again in the following readOnlyMemChunk
- ReadOnlyMemChunk timeValuePairSorter = new ReadOnlyMemChunk(dataType, memSeriesLazyMerger,
- Collections.emptyMap());
- return new Pair<>(timeValuePairSorter,
- writer.getMetadatas(deviceId, measurementId, dataType));
- } finally {
- flushQueryLock.unlock();
- }
- }
-
- private void switchWorkToFlush() {
- flushQueryLock.lock();
- try {
- IMemTable temp = flushMemTable == null ? new PrimitiveMemTable() : flushMemTable;
- flushMemTable = workMemTable;
- workMemTable = temp;
- isFlush = true;
- } finally {
- flushQueryLock.unlock();
- }
- }
-
- private void switchFlushToWork() {
- flushQueryLock.lock();
- try {
- flushMemTable.clear();
- writer.appendMetadata();
- isFlush = false;
- } finally {
- flushQueryLock.unlock();
- }
- }
-
-
- /**
- * the caller mast guarantee no other concurrent caller entering this function.
- *
- * @param displayMessage message that will appear in system log.
- * @param version the operation version that will tagged on the to be flushed memtable
- * (i.e., ChunkGroup)
- * @return true if successfully.
- */
- private boolean flushTask(String displayMessage, long version) {
- boolean result;
- long flushStartTime = System.currentTimeMillis();
- LOGGER.info("The bufferwrite processor {} starts flushing {}.", getProcessorName(),
- displayMessage);
- try {
- if (flushMemTable != null && !flushMemTable.isEmpty()) {
- // flush data
- MemTableFlushUtil.flushMemTable(fileSchema, writer, flushMemTable,
- version);
- // write restore information
- writer.flush();
- }
-
- filenodeFlushAction.act();
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- logNode.notifyEndFlush(null);
- }
- result = true;
- } catch (Exception e) {
- LOGGER.error(
- "The bufferwrite processor {} failed to flush {}, when calling the filenodeFlushAction.",
- getProcessorName(), displayMessage, e);
- result = false;
- } finally {
- switchFlushToWork();
- LOGGER.info("The bufferwrite processor {} ends flushing {}.", getProcessorName(),
- displayMessage);
- }
- if (LOGGER.isInfoEnabled()) {
- long flushEndTime = System.currentTimeMillis();
- LOGGER.info(
- "The bufferwrite processor {} flush {}, start time is {}, flush end time is {}, "
- + "flush time consumption is {}ms",
- getProcessorName(), displayMessage,
- DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime),
- DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime),
- flushEndTime - flushStartTime);
- }
- return result;
- }
-
- // keyword synchronized is added in this method, so that only one flush task can be submitted now.
- @Override
- public synchronized Future<Boolean> flush() throws IOException {
- if (isClosed) {
- throw new IOException("BufferWriteProcessor closed");
- }
- // statistic information for flush
- if (lastFlushTime > 0) {
- if (LOGGER.isInfoEnabled()) {
- long thisFlushTime = System.currentTimeMillis();
- LOGGER.info(
- "The bufferwrite processor {}: last flush time is {}, this flush time is {}, "
- + "flush time interval is {}s", getProcessorName(),
- DatetimeUtils.convertMillsecondToZonedDateTime(lastFlushTime / 1000),
- DatetimeUtils.convertMillsecondToZonedDateTime(thisFlushTime),
- (thisFlushTime - lastFlushTime / 1000) / 1000);
- }
- }
- lastFlushTime = System.nanoTime();
- // check value count
- // waiting for the end of last flush operation.
- try {
- flushFuture.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new IOException(e);
- }
- if (valueCount > 0) {
- // update the lastUpdatetime, prepare for flush
- try {
- bufferwriteFlushAction.act();
- } catch (Exception e) {
- LOGGER.error("Failed to flush bufferwrite row group when calling the action function.");
- throw new IOException(e);
- }
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- logNode.notifyStartFlush();
- }
- valueCount = 0;
- switchWorkToFlush();
- long version = versionController.nextVersion();
- BasicMemController.getInstance().releaseUsage(this, memSize.get());
- memSize.set(0);
- // switch
- flushFuture = FlushManager.getInstance().submit(() -> flushTask("asynchronously",
- version));
- } else {
- flushFuture = new ImmediateFuture<>(true);
- }
- return flushFuture;
- }
-
- @Override
- public boolean canBeClosed() {
- return true;
- }
-
- @Override
- public void close() throws TsFileProcessorException {
- if (isClosed) {
- return;
- }
- try {
- long closeStartTime = System.currentTimeMillis();
- // flush data and wait for finishing flush
- flush().get();
- // end file
- writer.endFile(fileSchema);
- writer = null;
- workMemTable.clear();
-
- // update the IntervalFile for interval list
- bufferwriteCloseAction.act();
- // flush the changed information for filenode
- filenodeFlushAction.act();
- // delete the restore for this bufferwrite processor
- if (LOGGER.isInfoEnabled()) {
- long closeEndTime = System.currentTimeMillis();
- LOGGER.info(
- "Close bufferwrite processor {}, the file name is {}, start time is {}, end time is {}, "
- + "time consumption is {}ms",
- getProcessorName(), insertFilePath,
- DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
- DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
- closeEndTime - closeStartTime);
- }
- isClosed = true;
- } catch (IOException e) {
- LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.",
- getProcessorName(), e);
- throw new TsFileProcessorException(e);
- } catch (Exception e) {
- LOGGER
- .error("Failed to close the bufferwrite processor when calling the action function.", e);
- throw new TsFileProcessorException(e);
- }
- }
-
- @Override
- public long memoryUsage() {
- return memSize.get();
- }
-
- /**
- * check if is flushing.
- *
- * @return True if flushing
- */
- public boolean isFlush() {
- return isFlush;
- }
-
- /**
- * get metadata size.
- *
- * @return The sum of all timeseries's metadata size within this file.
- */
- public long getMetaSize() {
- // TODO : [MemControl] implement this
- return 0;
- }
-
- /**
- * get file size.
- *
- * @return The file size of the TsFile corresponding to this processor.
- */
- public long getFileSize() {
- // TODO : save this variable to avoid object creation?
- File file = new File(insertFilePath);
- return file.length() + memoryUsage();
- }
-
- public String getBaseDir() {
- return baseDir;
- }
-
-
- public String getFileRelativePath() {
- return bufferWriteRelativePath;
- }
-
- private String getBufferwriteRestoreFilePath() {
- return writer.getRestoreFilePath();
- }
-
- public boolean isNewProcessor() {
- return writer.isNewResource();
- }
-
- public void setNewProcessor(boolean isNewProcessor) {
- writer.setNewResource(isNewProcessor);
- }
-
- public WriteLogNode getLogNode() {
- return logNode;
- }
-
- /**
- * used for test. We can know when the flush() is called.
- * @return the last flush() time. Time unit: nanosecond.
- */
- public long getLastFlushTime() {
- return lastFlushTime;
- }
-
- /**
- * used for test. We can block to wait for finishing flushing.
- * @return the future of the flush() task.
- */
- public Future<Boolean> getFlushFuture() {
- return flushFuture;
- }
-
- /**
- * Delete data whose timestamp <= 'timestamp' and belonging to timeseries deviceId.measurementId.
- * Delete data in both working MemTable and flushing MemTable.
- *
- * @param deviceId the deviceId of the timeseries to be deleted.
- * @param measurementId the measurementId of the timeseries to be deleted.
- * @param timestamp the upper-bound of deletion time.
- */
- public void delete(String deviceId, String measurementId, long timestamp)
- throws BufferWriteProcessorException {
- checkOpen();
- workMemTable.delete(deviceId, measurementId, timestamp);
- if (isFlush()) {
- // flushing MemTable cannot be directly modified since another thread is reading it
- flushMemTable = flushMemTable.copy();
- flushMemTable.delete(deviceId, measurementId, timestamp);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- return this == o;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode());
- }
-
- @Override
- public String toString() {
- return "BufferWriteProcessor in " + insertFilePath;
- }
-
- public String getInsertFilePath() {
- return insertFilePath;
- }
-
- public boolean isClosed() {
- return isClosed;
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeFlushFuture.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeFlushFuture.java
deleted file mode 100644
index 85c5bb4..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeFlushFuture.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.filenode;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.iotdb.db.utils.ImmediateFuture;
-
-public class FileNodeFlushFuture implements Future<Boolean> {
- Future<Boolean> bufferWriteFlushFuture;
- Future<Boolean> overflowFlushFuture;
- boolean hasOverflowFlushTask;
-
- public FileNodeFlushFuture(Future<Boolean> bufferWriteFlushFuture, Future<Boolean> overflowFlushFuture){
- if(bufferWriteFlushFuture != null) {
- this.bufferWriteFlushFuture = bufferWriteFlushFuture;
- } else {
- this.bufferWriteFlushFuture = new ImmediateFuture<>(true);
- }
- if(overflowFlushFuture !=null) {
- this.overflowFlushFuture = overflowFlushFuture;
- hasOverflowFlushTask = true;
- } else {
- this.overflowFlushFuture = new ImmediateFuture<>(true);
- hasOverflowFlushTask = false;
- }
- }
-
- /**
- * @param mayInterruptIfRunning true if the thread executing this task should be interrupted;
- * otherwise, in-progress tasks are allowed to complete
- * @return true if both of the two sub-future are canceled successfully.
- * @see Future#cancel(boolean) The difference is that this Future consists of two sub-Futures. If
- * the first sub-future is canceled successfully but the second sub-future fails, the result is
- * false.
- */
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- boolean bwResult = bufferWriteFlushFuture.cancel(mayInterruptIfRunning);
- boolean ofResult = overflowFlushFuture.cancel(mayInterruptIfRunning);
- return bwResult && ofResult;
- }
-
- @Override
- public boolean isCancelled() {
- return bufferWriteFlushFuture.isCancelled() && overflowFlushFuture.isCancelled();
- }
-
- @Override
- public boolean isDone() {
- return bufferWriteFlushFuture.isDone() && overflowFlushFuture.isDone();
- }
-
- @Override
- public Boolean get() throws InterruptedException, ExecutionException {
- boolean bwResult = bufferWriteFlushFuture.get();
- boolean ofResult = overflowFlushFuture.get();
- return bwResult && ofResult;
- }
-
- @Override
- public Boolean get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- boolean result = bufferWriteFlushFuture.get(timeout, unit);
- result = result && overflowFlushFuture.get(timeout, unit);
- return result;
- }
-
- public boolean isHasOverflowFlushTask() {
- return hasOverflowFlushTask;
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
deleted file mode 100644
index 0404a5c..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ /dev/null
@@ -1,1217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.filenode;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.io.FileUtils;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.Processor;
-import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
-import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
-import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
-import org.apache.iotdb.db.engine.pool.FlushManager;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.exception.StorageGroupManagerException;
-import org.apache.iotdb.db.exception.FileNodeProcessorException;
-import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.monitor.IStatistic;
-import org.apache.iotdb.db.monitor.MonitorConstants;
-import org.apache.iotdb.db.monitor.MonitorConstants.StorageGroupManagerStatConstants;
-import org.apache.iotdb.db.monitor.MonitorConstants.StorageGroupProcessorStatConstants;
-import org.apache.iotdb.db.monitor.StatMonitor;
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.service.IService;
-import org.apache.iotdb.db.service.ServiceType;
-import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FileNodeManager implements IStatistic, IService {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(FileNodeManager.class);
- private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig();
- private static final Directories directories = Directories.getInstance();
- /**
- * a folder that persist FileNodeProcessorStore classes. Each stroage group will have a subfolder.
- * by default, it is system/info
- */
- private final String baseDir;
-
- /**
- * This map is used to manage all filenode processor,<br> the key is filenode name which is
- * storage group seriesPath.
- */
- private ConcurrentHashMap<String, FileNodeProcessor> processorMap;
- /**
- * This set is used to store overflowed filenode name.<br> The overflowed filenode will be merge.
- */
- private volatile FileNodeManagerStatus fileNodeManagerStatus = FileNodeManagerStatus.NONE;
- // There is no need to add concurrently
- private HashMap<String, AtomicLong> statParamsHashMap;
-
- private FileNodeManager(String baseDir) {
- processorMap = new ConcurrentHashMap<>();
- statParamsHashMap = new HashMap<>();
- //label: A
- for (StorageGroupManagerStatConstants fileNodeManagerStatConstant :
- StorageGroupManagerStatConstants.values()) {
- statParamsHashMap.put(fileNodeManagerStatConstant.name(), new AtomicLong(0));
- }
-
- String normalizedBaseDir = baseDir;
- if (normalizedBaseDir.charAt(normalizedBaseDir.length() - 1) != File.separatorChar) {
- normalizedBaseDir += Character.toString(File.separatorChar);
- }
- this.baseDir = normalizedBaseDir;
- File dir = new File(normalizedBaseDir);
- if (dir.mkdirs()) {
- LOGGER.info("{} dir home doesn't exist, create it", dir.getPath());
- }
- //TODO merge this with label A
- if (TsFileDBConf.isEnableStatMonitor()) {
- StatMonitor statMonitor = StatMonitor.getInstance();
- registerStatMetadata();
- statMonitor.registerStatistics(MonitorConstants.STAT_STORAGE_DELTA_NAME, this);
- }
- }
-
- public static FileNodeManager getInstance() {
- return FileNodeManagerHolder.INSTANCE;
- }
-
- private void updateStatHashMapWhenFail(TSRecord tsRecord) {
- statParamsHashMap.get(StorageGroupManagerStatConstants.TOTAL_REQ_FAIL.name())
- .incrementAndGet();
- statParamsHashMap.get(StorageGroupManagerStatConstants.TOTAL_POINTS_FAIL.name())
- .addAndGet(tsRecord.dataPointList.size());
- }
-
- /**
- * get stats parameter hash map.
- *
- * @return the key represents the params' name, values is AtomicLong type
- */
- @Override
- public Map<String, AtomicLong> getStatParamsHashMap() {
- return statParamsHashMap;
- }
-
- @Override
- public List<String> getAllPathForStatistic() {
- List<String> list = new ArrayList<>();
- for (StorageGroupManagerStatConstants statConstant :
- StorageGroupManagerStatConstants.values()) {
- list.add(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPARATOR
- + statConstant.name());
- }
- return list;
- }
-
- @Override
- public Map<String, TSRecord> getAllStatisticsValue() {
- long curTime = System.currentTimeMillis();
- TSRecord tsRecord = StatMonitor
- .convertToTSRecord(getStatParamsHashMap(), MonitorConstants.STAT_STORAGE_DELTA_NAME,
- curTime);
- HashMap<String, TSRecord> ret = new HashMap<>();
- ret.put(MonitorConstants.STAT_STORAGE_DELTA_NAME, tsRecord);
- return ret;
- }
-
- /**
- * Init Stat MetaDta.
- */
- @Override
- public void registerStatMetadata() {
- Map<String, String> hashMap = new HashMap<>();
- for (StorageGroupManagerStatConstants statConstant :
- StorageGroupManagerStatConstants.values()) {
- hashMap
- .put(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPARATOR
- + statConstant.name(), MonitorConstants.DATA_TYPE_INT64);
- }
- StatMonitor.getInstance().registerStatStorageGroup(hashMap);
- }
-
- /**
- * This function is just for unit test.
- */
- public synchronized void resetFileNodeManager() {
- for (String key : statParamsHashMap.keySet()) {
- statParamsHashMap.put(key, new AtomicLong());
- }
- processorMap.clear();
- }
-
- /**
- * @param filenodeName storage name, e.g., root.a.b
- */
- private FileNodeProcessor constructNewProcessor(String filenodeName)
- throws StorageGroupManagerException {
- try {
- return new FileNodeProcessor(baseDir, filenodeName);
- } catch (FileNodeProcessorException e) {
- LOGGER.error("Can't construct the FileNodeProcessor, the filenode is {}", filenodeName, e);
- throw new StorageGroupManagerException(e);
- }
- }
-
- private FileNodeProcessor getProcessor(String path, boolean isWriteLock)
- throws StorageGroupManagerException {
- String filenodeName;
- try {
- // return the stroage name
- filenodeName = MManager.getInstance().getStorageGroupByPath(path);
- } catch (PathErrorException e) {
- LOGGER.error("MManager get filenode name error, seriesPath is {}", path);
- throw new StorageGroupManagerException(e);
- }
- FileNodeProcessor processor;
- processor = processorMap.get(filenodeName);
- if (processor != null) {
- processor.lock(isWriteLock);
- } else {
- filenodeName = filenodeName.intern();
- // calculate the value with same key synchronously
- synchronized (filenodeName) {
- processor = processorMap.get(filenodeName);
- if (processor != null) {
- processor.lock(isWriteLock);
- } else {
- // calculate the value with the key monitor
- LOGGER.debug("construct a processor instance, the filenode is {}, Thread is {}",
- filenodeName, Thread.currentThread().getId());
- processor = constructNewProcessor(filenodeName);
- processor.lock(isWriteLock);
- processorMap.put(filenodeName, processor);
- }
- }
- }
- return processor;
- }
-
- /**
- * recovery the filenode processor.
- */
- public void recovery() {
- List<String> filenodeNames = null;
- try {
- filenodeNames = MManager.getInstance().getAllStorageGroups();
- } catch (PathErrorException e) {
- LOGGER.error("Restoring all FileNodes failed.", e);
- return;
- }
- for (String filenodeName : filenodeNames) {
- FileNodeProcessor fileNodeProcessor = null;
- try {
- fileNodeProcessor = getProcessor(filenodeName, true);
- if (fileNodeProcessor.shouldRecovery()) {
- LOGGER.info("Recovery the filenode processor, the filenode is {}, the status is {}",
- filenodeName, fileNodeProcessor.getFileNodeProcessorStatus());
- fileNodeProcessor.fileNodeRecovery();
- }
- } catch (StorageGroupManagerException | FileNodeProcessorException e) {
- LOGGER.error("Restoring fileNode {} failed.", filenodeName, e);
- } finally {
- if (fileNodeProcessor != null) {
- fileNodeProcessor.writeUnlock();
- }
- }
- // add index check sum
- }
- }
-
- /**
- * insert TsRecord into storage group.
- *
- * @param tsRecord input Data
- * @param isMonitor if true, the insertion is done by StatMonitor and the statistic Info will not
- * be recorded. if false, the statParamsHashMap will be updated.
- */
- public void insert(TSRecord tsRecord, boolean isMonitor) throws StorageGroupManagerException {
- long timestamp = tsRecord.time;
-
- String deviceId = tsRecord.deviceId;
- checkTimestamp(tsRecord);
- updateStat(isMonitor, tsRecord);
-
- FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
-
- try {
- long lastUpdateTime = fileNodeProcessor.getFlushLastUpdateTime(deviceId);
- if (timestamp < lastUpdateTime) {
- insertOverflow(fileNodeProcessor, timestamp, tsRecord, isMonitor, deviceId);
- } else {
- insertBufferWrite(fileNodeProcessor, timestamp, isMonitor, tsRecord, deviceId);
- }
- } catch (FileNodeProcessorException e) {
- LOGGER.error(String.format("Encounter an error when closing the buffer write processor %s.",
- fileNodeProcessor.getProcessorName()), e);
- throw new StorageGroupManagerException(e);
- } finally {
- fileNodeProcessor.writeUnlock();
- }
- // Modify the insert
- if (!isMonitor) {
- fileNodeProcessor.getStatParamsHashMap()
- .get(StorageGroupProcessorStatConstants.TOTAL_POINTS_SUCCESS.name())
- .addAndGet(tsRecord.dataPointList.size());
- fileNodeProcessor.getStatParamsHashMap()
- .get(StorageGroupProcessorStatConstants.TOTAL_REQ_SUCCESS.name())
- .incrementAndGet();
- statParamsHashMap.get(StorageGroupManagerStatConstants.TOTAL_REQ_SUCCESS.name())
- .incrementAndGet();
- statParamsHashMap
- .get(StorageGroupManagerStatConstants.TOTAL_POINTS_SUCCESS.name())
- .addAndGet(tsRecord.dataPointList.size());
- }
- }
-
- private void writeLog(TSRecord tsRecord, boolean isMonitor, WriteLogNode logNode)
- throws StorageGroupManagerException {
- try {
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- String[] measurementList = new String[tsRecord.dataPointList.size()];
- String[] insertValues = new String[tsRecord.dataPointList.size()];
- int i=0;
- for (DataPoint dp : tsRecord.dataPointList) {
- measurementList[i] = dp.getMeasurementId();
- insertValues[i] = dp.getValue().toString();
- i++;
- }
- logNode.write(new InsertPlan(2, tsRecord.deviceId, tsRecord.time, measurementList,
- insertValues));
- }
- } catch (IOException e) {
- if (!isMonitor) {
- updateStatHashMapWhenFail(tsRecord);
- }
- throw new StorageGroupManagerException(e);
- }
- }
-
- private void checkTimestamp(TSRecord tsRecord) throws StorageGroupManagerException {
- if (tsRecord.time < 0) {
- LOGGER.error("The insert time lt 0, {}.", tsRecord);
- throw new StorageGroupManagerException("The insert time lt 0, the tsrecord is " + tsRecord);
- }
- }
-
- private void updateStat(boolean isMonitor, TSRecord tsRecord) {
- if (!isMonitor) {
- statParamsHashMap.get(StorageGroupManagerStatConstants.TOTAL_POINTS.name())
- .addAndGet(tsRecord.dataPointList.size());
- }
- }
-
- private void insertOverflow(FileNodeProcessor fileNodeProcessor, long timestamp,
- TSRecord tsRecord, boolean isMonitor, String deviceId)
- throws StorageGroupManagerException {
- // get overflow processor
- OverflowProcessor overflowProcessor;
- String filenodeName = fileNodeProcessor.getProcessorName();
- try {
- overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName);
- } catch (IOException e) {
- LOGGER.error("Get the overflow processor failed, the filenode is {}, insert time is {}",
- filenodeName, timestamp);
- if (!isMonitor) {
- updateStatHashMapWhenFail(tsRecord);
- }
- throw new StorageGroupManagerException(e);
- }
- // write wal
- writeLog(tsRecord, isMonitor, overflowProcessor.getLogNode());
- // write overflow data
- try {
- overflowProcessor.insert(tsRecord);
- fileNodeProcessor.changeTypeToChanged(deviceId, timestamp);
- fileNodeProcessor.setOverflowed(true);
- } catch (IOException e) {
- LOGGER.error("Insert into overflow error, the reason is {}", e);
- if (!isMonitor) {
- updateStatHashMapWhenFail(tsRecord);
- }
- throw new StorageGroupManagerException(e);
- }
- }
-
- private void insertBufferWrite(FileNodeProcessor fileNodeProcessor, long timestamp,
- boolean isMonitor, TSRecord tsRecord, String deviceId)
- throws StorageGroupManagerException, FileNodeProcessorException {
- // get bufferwrite processor
- BufferWriteProcessor bufferWriteProcessor;
- String filenodeName = fileNodeProcessor.getProcessorName();
- try {
- bufferWriteProcessor = fileNodeProcessor.getBufferWriteProcessor(filenodeName, timestamp);
- } catch (FileNodeProcessorException e) {
- LOGGER.error("Get the bufferwrite processor failed, the filenode is {}, insert time is {}",
- filenodeName, timestamp);
- if (!isMonitor) {
- updateStatHashMapWhenFail(tsRecord);
- }
- throw new StorageGroupManagerException(e);
- }
- // Add a new interval file to newfilelist
- if (bufferWriteProcessor.isNewProcessor()) {
- bufferWriteProcessor.setNewProcessor(false);
- String bufferwriteBaseDir = bufferWriteProcessor.getBaseDir();
- String bufferwriteRelativePath = bufferWriteProcessor.getFileRelativePath();
- try {
- fileNodeProcessor.addIntervalFileNode(new File(new File(bufferwriteBaseDir), bufferwriteRelativePath));
- } catch (Exception e) {
- if (!isMonitor) {
- updateStatHashMapWhenFail(tsRecord);
- }
- throw new StorageGroupManagerException(e);
- }
- }
- // write wal
- writeLog(tsRecord, isMonitor, bufferWriteProcessor.getLogNode());
- // Write data
- long prevStartTime = fileNodeProcessor.getIntervalFileNodeStartTime(deviceId);
- long prevUpdateTime = fileNodeProcessor.getLastUpdateTime(deviceId);
-
- fileNodeProcessor.setIntervalFileNodeStartTime(deviceId);
- fileNodeProcessor.setLastUpdateTime(deviceId, timestamp);
- try {
- if (!bufferWriteProcessor.write(tsRecord)) {
- // undo time update
- fileNodeProcessor.setIntervalFileNodeStartTime(deviceId, prevStartTime);
- fileNodeProcessor.setLastUpdateTime(deviceId, prevUpdateTime);
- }
- } catch (BufferWriteProcessorException e) {
- if (!isMonitor) {
- updateStatHashMapWhenFail(tsRecord);
- }
- throw new StorageGroupManagerException(e);
- }
-
- if (bufferWriteProcessor
- .getFileSize() > IoTDBDescriptor.getInstance()
- .getConfig().getBufferwriteFileSizeThreshold()) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info(
- "The filenode processor {} will close the bufferwrite processor, "
- + "because the size[{}] of tsfile {} reaches the threshold {}",
- filenodeName, MemUtils.bytesCntToStr(bufferWriteProcessor.getFileSize()),
- bufferWriteProcessor.getInsertFilePath(), MemUtils.bytesCntToStr(
- IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold()));
- }
-
- fileNodeProcessor.closeBufferWrite();
- }
- }
-
- /**
- * update data.
- */
- public void update(String deviceId, String measurementId, long startTime, long endTime,
- TSDataType type, String v)
- throws StorageGroupManagerException {
-
- FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
- try {
-
- long lastUpdateTime = fileNodeProcessor.getLastUpdateTime(deviceId);
- if (startTime > lastUpdateTime) {
- LOGGER.warn("The update range is error, startTime {} is great than lastUpdateTime {}",
- startTime,
- lastUpdateTime);
- return;
- }
- long finalEndTime = endTime > lastUpdateTime ? lastUpdateTime : endTime;
-
- String filenodeName = fileNodeProcessor.getProcessorName();
- // get overflow processor
- OverflowProcessor overflowProcessor;
- try {
- overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName);
- } catch (IOException e) {
- LOGGER.error(
- "Get the overflow processor failed, the filenode is {}, "
- + "insert time range is from {} to {}",
- filenodeName, startTime, finalEndTime);
- throw new StorageGroupManagerException(e);
- }
- overflowProcessor.update(deviceId, measurementId, startTime, finalEndTime, type, v);
- // change the type of tsfile to overflowed
- fileNodeProcessor.changeTypeToChanged(deviceId, startTime, finalEndTime);
- fileNodeProcessor.setOverflowed(true);
-
- // write wal
- try {
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- overflowProcessor.getLogNode().write(
- new UpdatePlan(startTime, finalEndTime, v, new Path(deviceId
- + "." + measurementId)));
- }
- } catch (IOException e) {
- throw new StorageGroupManagerException(e);
- }
- } finally {
- fileNodeProcessor.writeUnlock();
- }
- }
-
- /**
- * delete data.
- */
- public void delete(String deviceId, String measurementId, long timestamp)
- throws StorageGroupManagerException {
-
- FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
- try {
- long lastUpdateTime = fileNodeProcessor.getLastUpdateTime(deviceId);
- // no tsfile data, the delete operation is invalid
- if (lastUpdateTime == -1) {
- LOGGER.warn("The last update time is -1, delete overflow is invalid, "
- + "the filenode processor is {}",
- fileNodeProcessor.getProcessorName());
- } else {
- // write wal
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- // get processors for wal
- String filenodeName = fileNodeProcessor.getProcessorName();
- OverflowProcessor overflowProcessor;
- BufferWriteProcessor bufferWriteProcessor;
- try {
- overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName);
- // in case that no BufferWriteProcessor is available, a new BufferWriteProcessor is
- // needed to access LogNode.
- // TODO this may make the time range of the next TsFile a little wider
- bufferWriteProcessor = fileNodeProcessor.getBufferWriteProcessor(filenodeName,
- lastUpdateTime + 1);
- } catch (IOException | FileNodeProcessorException e) {
- LOGGER.error("Getting the processor failed, the filenode is {}, delete time is {}.",
- filenodeName, timestamp);
- throw new StorageGroupManagerException(e);
- }
- try {
- overflowProcessor.getLogNode().write(new DeletePlan(timestamp,
- new Path(deviceId + "." + measurementId)));
- bufferWriteProcessor.getLogNode().write(new DeletePlan(timestamp,
- new Path(deviceId + "." + measurementId)));
- } catch (IOException e) {
- throw new StorageGroupManagerException(e);
- }
- }
-
- try {
- fileNodeProcessor.delete(deviceId, measurementId, timestamp);
- } catch (IOException e) {
- throw new StorageGroupManagerException(e);
- }
- // change the type of tsfile to overflowed
- fileNodeProcessor.changeTypeToChangedForDelete(deviceId, timestamp);
- fileNodeProcessor.setOverflowed(true);
-
- }
- } finally {
- fileNodeProcessor.writeUnlock();
- }
- }
-
- private void delete(String processorName,
- Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator)
- throws StorageGroupManagerException {
- if (!processorMap.containsKey(processorName)) {
- //TODO do we need to call processorIterator.remove() ?
- LOGGER.warn("The processorMap doesn't contain the filenode processor {}.", processorName);
- return;
- }
- LOGGER.info("Try to delete the filenode processor {}.", processorName);
- FileNodeProcessor processor = processorMap.get(processorName);
- if (!processor.tryWriteLock()) {
- throw new StorageGroupManagerException(String
- .format("Can't delete the filenode processor %s because Can't get the write lock.",
- processorName));
- }
-
- try {
- if (!processor.canBeClosed()) {
- LOGGER.warn("The filenode processor {} can't be deleted.", processorName);
- return;
- }
-
- try {
- LOGGER.info("Delete the filenode processor {}.", processorName);
- processor.delete();
- processorIterator.remove();
- } catch (ProcessorException e) {
- LOGGER.error("Delete the filenode processor {} by iterator error.", processorName, e);
- throw new StorageGroupManagerException(e);
- }
- } finally {
- processor.writeUnlock();
- }
- }
-
- /**
- * Similar to delete(), but only deletes data in BufferWrite. Only used by WAL recovery.
- */
- public void deleteBufferWrite(String deviceId, String measurementId, long timestamp)
- throws StorageGroupManagerException {
- FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
- try {
- fileNodeProcessor.deleteBufferWrite(deviceId, measurementId, timestamp);
- } catch (BufferWriteProcessorException | IOException e) {
- throw new StorageGroupManagerException(e);
- } finally {
- fileNodeProcessor.writeUnlock();
- }
- // change the type of tsfile to overflowed
- fileNodeProcessor.changeTypeToChangedForDelete(deviceId, timestamp);
- fileNodeProcessor.setOverflowed(true);
- }
-
- /**
- * Similar to delete(), but only deletes data in Overflow. Only used by WAL recovery.
- */
- public void deleteOverflow(String deviceId, String measurementId, long timestamp)
- throws StorageGroupManagerException {
- FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
- try {
- fileNodeProcessor.deleteOverflow(deviceId, measurementId, timestamp);
- } catch (IOException e) {
- throw new StorageGroupManagerException(e);
- } finally {
- fileNodeProcessor.writeUnlock();
- }
- // change the type of tsfile to overflowed
- fileNodeProcessor.changeTypeToChangedForDelete(deviceId, timestamp);
- fileNodeProcessor.setOverflowed(true);
- }
-
- /**
- * begin query.
- *
- * @param deviceId queried deviceId
- * @return a query token for the device.
- */
- public int beginQuery(String deviceId) throws StorageGroupManagerException {
- FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
- try {
- LOGGER.debug("Get the FileNodeProcessor: filenode is {}, begin query.",
- fileNodeProcessor.getProcessorName());
- return fileNodeProcessor.addMultiPassCount();
- } finally {
- fileNodeProcessor.writeUnlock();
- }
- }
-
- /**
- * query data.
- */
- public QueryDataSource query(SingleSeriesExpression seriesExpression, QueryContext context)
- throws StorageGroupManagerException {
- String deviceId = seriesExpression.getSeriesPath().getDevice();
- String measurementId = seriesExpression.getSeriesPath().getMeasurement();
- FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, false);
- LOGGER.debug("Get the FileNodeProcessor: filenode is {}, query.",
- fileNodeProcessor.getProcessorName());
- try {
- QueryDataSource queryDataSource;
- // query operation must have overflow processor
- if (!fileNodeProcessor.hasOverflowProcessor()) {
- try {
- fileNodeProcessor.getOverflowProcessor(fileNodeProcessor.getProcessorName());
- } catch (IOException e) {
- LOGGER.error("Get the overflow processor failed, the filenode is {}, query is {},{}",
- fileNodeProcessor.getProcessorName(), deviceId, measurementId);
- throw new StorageGroupManagerException(e);
- }
- }
- try {
- queryDataSource = fileNodeProcessor.query(deviceId, measurementId, context);
- } catch (FileNodeProcessorException e) {
- LOGGER.error("Query error: the deviceId {}, the measurementId {}", deviceId, measurementId,
- e);
- throw new StorageGroupManagerException(e);
- }
- // return query structure
- return queryDataSource;
- } finally {
- fileNodeProcessor.readUnlock();
- }
- }
-
- /**
- * end query.
- */
- public void endQuery(String deviceId, int token) throws StorageGroupManagerException {
-
- FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
- try {
- LOGGER.debug("Get the FileNodeProcessor: {} end query.",
- fileNodeProcessor.getProcessorName());
- fileNodeProcessor.decreaseMultiPassCount(token);
- } catch (FileNodeProcessorException e) {
- LOGGER.error("Failed to end query: the deviceId {}, token {}.", deviceId, token, e);
- throw new StorageGroupManagerException(e);
- } finally {
- fileNodeProcessor.writeUnlock();
- }
- }
-
- /**
- * Append one specified tsfile to the storage group. <b>This method is only provided for
- * transmission module</b>
- *
- * @param fileNodeName the seriesPath of storage group
- * @param appendFile the appended tsfile information
- */
- public boolean appendFileToFileNode(String fileNodeName, TsFileResource appendFile,
- String appendFilePath) throws StorageGroupManagerException {
- FileNodeProcessor fileNodeProcessor = getProcessor(fileNodeName, true);
- try {
- // check append file
- for (Map.Entry<String, Long> entry : appendFile.getStartTimeMap().entrySet()) {
- if (fileNodeProcessor.getLastUpdateTime(entry.getKey()) >= entry.getValue()) {
- return false;
- }
- }
- // close bufferwrite file
- fileNodeProcessor.closeBufferWrite();
- // append file to storage group.
- fileNodeProcessor.appendFile(appendFile, appendFilePath);
- } catch (FileNodeProcessorException e) {
- LOGGER.error("Cannot append the file {} to {}", appendFile.getFile().getAbsolutePath(), fileNodeName, e);
- throw new StorageGroupManagerException(e);
- } finally {
- fileNodeProcessor.writeUnlock();
- }
- return true;
- }
-
- /**
- * get all overlap tsfiles which are conflict with the appendFile.
- *
- * @param fileNodeName the seriesPath of storage group
- * @param appendFile the appended tsfile information
- */
- public List<String> getOverlapFilesFromFileNode(String fileNodeName, TsFileResource appendFile,
- String uuid) throws StorageGroupManagerException {
- FileNodeProcessor fileNodeProcessor = getProcessor(fileNodeName, true);
- List<String> overlapFiles;
- try {
- overlapFiles = fileNodeProcessor.getOverlapFiles(appendFile, uuid);
- } catch (FileNodeProcessorException e) {
- throw new StorageGroupManagerException(e);
- } finally {
- fileNodeProcessor.writeUnlock();
- }
- return overlapFiles;
- }
-
- /**
- * merge all overflowed filenode.
- *
- * @throws StorageGroupManagerException StorageGroupManagerException
- */
- public void mergeAll() throws StorageGroupManagerException {
- if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
- LOGGER.warn("Failed to merge all overflowed filenode, because filenode manager status is {}",
- fileNodeManagerStatus);
- return;
- }
-
- fileNodeManagerStatus = FileNodeManagerStatus.MERGE;
- LOGGER.info("Start to merge all overflowed filenode");
- List<String> allFileNodeNames;
- try {
- allFileNodeNames = MManager.getInstance().getAllStorageGroups();
- } catch (PathErrorException e) {
- LOGGER.error("Get all storage group seriesPath error,", e);
- throw new StorageGroupManagerException(e);
- }
- List<Future<?>> futureTasks = new ArrayList<>();
- for (String fileNodeName : allFileNodeNames) {
- FileNodeProcessor fileNodeProcessor = getProcessor(fileNodeName, true);
- try {
- Future<?> task = fileNodeProcessor.submitToMerge();
- if (task != null) {
- LOGGER.info("Submit the filenode {} to the merge pool", fileNodeName);
- futureTasks.add(task);
- }
- } finally {
- fileNodeProcessor.writeUnlock();
- }
- }
- long totalTime = 0;
- // loop waiting for merge to end, the longest waiting time is
- // 60s.
- int time = 2;
- List<Exception> mergeException = new ArrayList<>();
- for (Future<?> task : futureTasks) {
- while (!task.isDone()) {
- try {
- LOGGER.info(
- "Waiting for the end of merge, already waiting for {}s, "
- + "continue to wait anothor {}s",
- totalTime, time);
- TimeUnit.SECONDS.sleep(time);
- totalTime += time;
- time = updateWaitTime(time);
- } catch (InterruptedException e) {
- LOGGER.error("Unexpected interruption {}", e);
- Thread.currentThread().interrupt();
- }
- }
- try {
- task.get();
- } catch (InterruptedException e) {
- LOGGER.error("Unexpected interruption {}", e);
- } catch (ExecutionException e) {
- mergeException.add(e);
- LOGGER.error("The exception for merge: {}", e);
- }
- }
- if (!mergeException.isEmpty()) {
- // just throw the first exception
- throw new StorageGroupManagerException(mergeException.get(0));
- }
- fileNodeManagerStatus = FileNodeManagerStatus.NONE;
- LOGGER.info("End to merge all overflowed filenode");
- }
-
- private int updateWaitTime(int time) {
- return time < 32 ? time * 2 : 60;
- }
-
- /**
- * try to close the filenode processor. The name of filenode processor is processorName
- */
- private boolean closeOneProcessor(String processorName) throws StorageGroupManagerException {
- if (!processorMap.containsKey(processorName)) {
- return true;
- }
-
- Processor processor = processorMap.get(processorName);
- if (processor.tryWriteLock()) {
- try {
- if (processor.canBeClosed()) {
- processor.close();
- return true;
- } else {
- return false;
- }
- } catch (ProcessorException e) {
- LOGGER.error("Close the filenode processor {} error.", processorName, e);
- throw new StorageGroupManagerException(e);
- } finally {
- processor.writeUnlock();
- }
- } else {
- return false;
- }
- }
-
- /**
- * delete one filenode.
- */
- public void deleteOneFileNode(String processorName) throws StorageGroupManagerException {
- if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
- return;
- }
-
- fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;
- try {
- if (processorMap.containsKey(processorName)) {
- deleteFileNodeBlocked(processorName);
- }
- String fileNodePath = TsFileDBConf.getFileNodeDir();
- fileNodePath = standardizeDir(fileNodePath) + processorName;
- FileUtils.deleteDirectory(new File(fileNodePath));
-
- cleanBufferWrite(processorName);
-
- MultiFileLogNodeManager.getInstance()
- .deleteNode(processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX);
- MultiFileLogNodeManager.getInstance()
- .deleteNode(processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
- } catch (IOException e) {
- LOGGER.error("Delete the filenode processor {} error.", processorName, e);
- throw new StorageGroupManagerException(e);
- } finally {
- fileNodeManagerStatus = FileNodeManagerStatus.NONE;
- }
- }
-
- private void cleanBufferWrite(String processorName) throws IOException {
- List<String> bufferwritePathList = directories.getAllTsFileFolders();
- for (String bufferwritePath : bufferwritePathList) {
- bufferwritePath = standardizeDir(bufferwritePath) + processorName;
- File bufferDir = new File(bufferwritePath);
- // free and close the streams under this bufferwrite directory
- if (!bufferDir.exists()) {
- continue;
- }
- File[] bufferFiles = bufferDir.listFiles();
- if (bufferFiles != null) {
- for (File bufferFile : bufferFiles) {
- FileReaderManager.getInstance().closeFileAndRemoveReader(bufferFile.getPath());
- }
- }
- FileUtils.deleteDirectory(new File(bufferwritePath));
- }
- }
-
- private void deleteFileNodeBlocked(String processorName) throws StorageGroupManagerException {
- LOGGER.info("Forced to delete the filenode processor {}", processorName);
- FileNodeProcessor processor = processorMap.get(processorName);
- while (true) {
- if (processor.tryWriteLock()) {
- try {
- if (processor.canBeClosed()) {
- LOGGER.info("Delete the filenode processor {}.", processorName);
- processor.delete();
- processorMap.remove(processorName);
- break;
- } else {
- LOGGER.info(
- "Can't delete the filenode processor {}, "
- + "because the filenode processor can't be closed."
- + " Wait 100ms to retry");
- }
- } catch (ProcessorException e) {
- LOGGER.error("Delete the filenode processor {} error.", processorName, e);
- throw new StorageGroupManagerException(e);
- } finally {
- processor.writeUnlock();
- }
- } else {
- LOGGER.info(
- "Can't delete the filenode processor {}, because it can't get the write lock."
- + " Wait 100ms to retry", processorName);
- }
- try {
- TimeUnit.MILLISECONDS.sleep(100);
- } catch (InterruptedException e) {
- LOGGER.error(e.getMessage());
- Thread.currentThread().interrupt();
- }
- }
- }
-
- private String standardizeDir(String originalPath) {
- String res = originalPath;
- if ((originalPath.length() > 0
- && originalPath.charAt(originalPath.length() - 1) != File.separatorChar)
- || originalPath.length() == 0) {
- res = originalPath + File.separatorChar;
- }
- return res;
- }
-
- /**
- * add time series.
- */
- public void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding,
- CompressionType compressor,
- Map<String, String> props) throws StorageGroupManagerException {
- FileNodeProcessor fileNodeProcessor = getProcessor(path.getFullPath(), true);
- try {
- fileNodeProcessor.addTimeSeries(path.getMeasurement(), dataType, encoding, compressor, props);
- } finally {
- fileNodeProcessor.writeUnlock();
- }
- }
-
-
- /**
- * Force to close the filenode processor.
- */
- public void closeOneFileNode(String processorName) throws StorageGroupManagerException {
- if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
- return;
- }
-
- fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;
- try {
- LOGGER.info("Force to close the filenode processor {}.", processorName);
- while (!closeOneProcessor(processorName)) {
- try {
- LOGGER.info("Can't force to close the filenode processor {}, wait 100ms to retry",
- processorName);
- TimeUnit.MILLISECONDS.sleep(100);
- } catch (InterruptedException e) {
- // ignore the interrupted exception
- LOGGER.error("Unexpected interruption {}", e);
- Thread.currentThread().interrupt();
- }
- }
- } finally {
- fileNodeManagerStatus = FileNodeManagerStatus.NONE;
- }
- }
-
- /**
- * try to close the filenode processor.
- */
- private void close(String processorName) throws StorageGroupManagerException {
- if (!processorMap.containsKey(processorName)) {
- LOGGER.warn("The processorMap doesn't contain the filenode processor {}.", processorName);
- return;
- }
- LOGGER.info("Try to close the filenode processor {}.", processorName);
- FileNodeProcessor processor = processorMap.get(processorName);
- if (!processor.tryWriteLock()) {
- LOGGER.warn("Can't get the write lock of the filenode processor {}.", processorName);
- return;
- }
- try {
- if (processor.canBeClosed()) {
- try {
- LOGGER.info("Close the filenode processor {}.", processorName);
- processor.close();
- } catch (ProcessorException e) {
- LOGGER.error("Close the filenode processor {} error.", processorName, e);
- throw new StorageGroupManagerException(e);
- }
- } else {
- LOGGER.warn("The filenode processor {} can't be closed.", processorName);
- }
- } finally {
- processor.writeUnlock();
- }
- }
-
- /**
- * delete all filenode.
- */
- public synchronized boolean deleteAll() throws StorageGroupManagerException {
- LOGGER.info("Start deleting all filenode");
- if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
- LOGGER.info("Failed to delete all filenode processor because of merge operation");
- return false;
- }
-
- fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;
- try {
- Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator = processorMap.entrySet()
- .iterator();
- while (processorIterator.hasNext()) {
- Map.Entry<String, FileNodeProcessor> processorEntry = processorIterator.next();
- delete(processorEntry.getKey(), processorIterator);
- }
- return processorMap.isEmpty();
- } finally {
- LOGGER.info("Deleting all FileNodeProcessors ends");
- fileNodeManagerStatus = FileNodeManagerStatus.NONE;
- }
- }
-
- /**
- * Try to close All.
- */
- public void closeAll() throws StorageGroupManagerException {
- LOGGER.info("Start closing all filenode processor");
- if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
- LOGGER.info("Failed to close all filenode processor because of merge operation");
- return;
- }
- fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;
- try {
- for (Map.Entry<String, FileNodeProcessor> processorEntry : processorMap.entrySet()) {
- close(processorEntry.getKey());
- }
- } finally {
- LOGGER.info("Close all FileNodeProcessors ends");
- fileNodeManagerStatus = FileNodeManagerStatus.NONE;
- }
- }
-
- /**
- * force flush to control memory usage.
- */
- public void forceFlush(BasicMemController.UsageLevel level) {
- // you may add some delicate process like below
- // or you could provide multiple methods for different urgency
- switch (level) {
- // only select the most urgent (most active or biggest in size)
- // processors to flush
- // only select top 10% active memory user to flush
- case WARNING:
- try {
- flushTop(0.1f);
- } catch (IOException e) {
- LOGGER.error("force flush memory data error: {}", e);
- }
- break;
- // force all processors to flush
- case DANGEROUS:
- try {
- flushAll();
- } catch (IOException e) {
- LOGGER.error("force flush memory data error: {}", e);
- }
- break;
- // if the flush thread pool is not full ( or half full), start a new
- // flush task
- case SAFE:
- if (FlushManager.getInstance().getActiveCnt() < 0.5 * FlushManager.getInstance()
- .getThreadCnt()) {
- try {
- flushTop(0.01f);
- } catch (IOException e) {
- LOGGER.error("force flush memory data error: ", e);
- }
- }
- break;
- default:
- }
- }
-
- private void flushAll() throws IOException {
- for (FileNodeProcessor processor : processorMap.values()) {
- if (!processor.tryLock(true)) {
- continue;
- }
- try {
- boolean isMerge = processor.flush().isHasOverflowFlushTask();
- if (isMerge) {
- processor.submitToMerge();
- }
- } finally {
- processor.unlock(true);
- }
- }
- }
-
- private void flushTop(float percentage) throws IOException {
- List<FileNodeProcessor> tempProcessors = new ArrayList<>(processorMap.values());
- // sort the tempProcessors as descending order
- tempProcessors.sort((o1, o2) -> (int) (o2.memoryUsage() - o1.memoryUsage()));
- int flushNum =
- (int) (tempProcessors.size() * percentage) > 1
- ? (int) (tempProcessors.size() * percentage)
- : 1;
- for (int i = 0; i < flushNum && i < tempProcessors.size(); i++) {
- FileNodeProcessor processor = tempProcessors.get(i);
- // 64M
- if (processor.memoryUsage() <= TSFileConfig.groupSizeInByte / 2) {
- continue;
- }
- processor.writeLock();
- try {
- boolean isMerge = processor.flush().isHasOverflowFlushTask();
- if (isMerge) {
- processor.submitToMerge();
- }
- } finally {
- processor.writeUnlock();
- }
- }
- }
-
- @Override
- public void start() {
- // do no thing
- }
-
- @Override
- public void stop() {
- try {
- closeAll();
- } catch (StorageGroupManagerException e) {
- LOGGER.error("Failed to close file node manager because .", e);
- }
- }
-
- @Override
- public ServiceType getID() {
- return ServiceType.FILE_NODE_SERVICE;
- }
-
- /**
- * get restore file path.
- */
- public String getRestoreFilePath(String processorName) {
- FileNodeProcessor fileNodeProcessor = processorMap.get(processorName);
- if (fileNodeProcessor != null) {
- return fileNodeProcessor.getFileNodeRestoreFilePath();
- } else {
- return null;
- }
- }
-
- /**
- * recover filenode.
- */
- public void recoverFileNode(String filenodeName)
- throws StorageGroupManagerException {
- FileNodeProcessor fileNodeProcessor = getProcessor(filenodeName, true);
- LOGGER.info("Recover the filenode processor, the filenode is {}, the status is {}",
- filenodeName, fileNodeProcessor.getFileNodeProcessorStatus());
- try {
- fileNodeProcessor.fileNodeRecovery();
- } catch (FileNodeProcessorException e) {
- throw new StorageGroupManagerException(e);
- } finally {
- fileNodeProcessor.writeUnlock();
- }
- }
-
- private enum FileNodeManagerStatus {
- NONE, MERGE, CLOSE
- }
-
- private static class FileNodeManagerHolder {
-
- private FileNodeManagerHolder() {
- }
-
- private static final FileNodeManager INSTANCE = new FileNodeManager(
- TsFileDBConf.getFileNodeDir());
- }
-
-}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
deleted file mode 100644
index a66116e..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ /dev/null
@@ -1,2065 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenode;
-
-import static java.time.ZonedDateTime.ofInstant;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.time.Instant;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.Processor;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
-import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.modification.Modification;
-import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
-import org.apache.iotdb.db.engine.pool.MergeManager;
-import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
-import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
-import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
-import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
-import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.exception.ErrorDebugException;
-import org.apache.iotdb.db.exception.FileNodeProcessorException;
-import org.apache.iotdb.db.exception.OverflowProcessorException;
-import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.exception.TsFileProcessorException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.monitor.IStatistic;
-import org.apache.iotdb.db.monitor.MonitorConstants;
-import org.apache.iotdb.db.monitor.MonitorConstants.StorageGroupProcessorStatConstants;
-import org.apache.iotdb.db.monitor.StatMonitor;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
-import org.apache.iotdb.db.query.reader.IReader;
-import org.apache.iotdb.db.sync.conf.Constans;
-import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.db.utils.QueryUtils;
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-import org.apache.iotdb.tsfile.read.filter.TimeFilter;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
-import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FileNodeProcessor extends Processor implements IStatistic {
-
- private static final String WARN_NO_SUCH_OVERFLOWED_FILE = "Can not find any tsfile which"
- + " will be overflowed in the filenode processor {}, ";
- private static final String RESTORE_FILE_SUFFIX = ".restore";
- private static final Logger LOGGER = LoggerFactory.getLogger(FileNodeProcessor.class);
- private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig();
- private static final MManager mManager = MManager.getInstance();
- private static final Directories directories = Directories.getInstance();
- private final String statStorageDeltaName;
- private final HashMap<String, AtomicLong> statParamsHashMap = new HashMap<>();
- /**
- * Used to keep the oldest timestamp for each deviceId. The key is deviceId.
- */
- private volatile boolean isOverflowed;
- private Map<String, Long> lastUpdateTimeMap;
- private Map<String, Long> flushLastUpdateTimeMap;
- private Map<String, List<TsFileResource>> invertedIndexOfFiles;
- private TsFileResource emptyTsFileResource;
- private TsFileResource currentTsFileResource;
- private List<TsFileResource> newFileNodes;
- private FileNodeProcessorStatus isMerging;
-
- /**
- * this is used when work->merge operation
- */
- private int numOfMergeFile;
- private FileNodeProcessorStore fileNodeProcessorStore;
- private String fileNodeRestoreFilePath;
- private final Object fileNodeRestoreLock = new Object();
-
- /**
- * last merge time
- */
- private long lastMergeTime = -1;
- private BufferWriteProcessor bufferWriteProcessor = null;
- private OverflowProcessor overflowProcessor1 = null;
- private Set<Integer> oldMultiPassTokenSet = null;
- private Set<Integer> newMultiPassTokenSet = new HashSet<>();
-
- /**
- * Represent the number of old queries that have not ended.
- * This parameter only decreases but not increase.
- */
- private CountDownLatch oldMultiPassCount = null;
-
- /**
- * Represent the number of new queries that have not ended.
- */
- private AtomicInteger newMultiPassCount = new AtomicInteger(0);
- /**
- * system recovery
- */
- private boolean shouldRecovery = false;
- /**
- * statistic monitor parameters
- */
- private Map<String, Action> parameters;
- private FileSchema fileSchema;
- private Action flushFileNodeProcessorAction = () -> {
- synchronized (fileNodeProcessorStore) {
- try {
- writeStoreToDisk(fileNodeProcessorStore);
- } catch (FileNodeProcessorException e) {
- throw new ActionException(e);
- }
- }
- };
- private Action bufferwriteFlushAction = () -> {
- // update the lastUpdateTime Notice: Thread safe
- synchronized (fileNodeProcessorStore) {
- // deep copy
- Map<String, Long> tempLastUpdateMap = new HashMap<>(lastUpdateTimeMap);
- // update flushLastUpdateTimeMap
- for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
- flushLastUpdateTimeMap.put(entry.getKey(), entry.getValue() + 1);
- }
- fileNodeProcessorStore.setLastUpdateTimeMap(tempLastUpdateMap);
- }
- };
-
- private Action bufferwriteCloseAction = new Action() {
-
- @Override
- public void act() {
- synchronized (fileNodeProcessorStore) {
- fileNodeProcessorStore.setLastUpdateTimeMap(lastUpdateTimeMap);
- addLastTimeToIntervalFile();
- fileNodeProcessorStore.setNewFileNodes(newFileNodes);
- }
- }
-
- private void addLastTimeToIntervalFile() {
-
- if (!newFileNodes.isEmpty()) {
- // end time with one start time
- Map<String, Long> endTimeMap = new HashMap<>();
- for (Entry<String, Long> startTime : currentTsFileResource.getStartTimeMap().entrySet()) {
- String deviceId = startTime.getKey();
- endTimeMap.put(deviceId, lastUpdateTimeMap.get(deviceId));
- }
- currentTsFileResource.setEndTimeMap(endTimeMap);
- }
- }
- };
- private Action overflowFlushAction = () -> {
-
- // update the new TsFileResource List and emptyIntervalFile.
- // Notice: thread safe
- synchronized (fileNodeProcessorStore) {
- fileNodeProcessorStore.setOverflowed(isOverflowed);
- fileNodeProcessorStore.setEmptyTsFileResource(emptyTsFileResource);
- fileNodeProcessorStore.setNewFileNodes(newFileNodes);
- }
- };
- // Token for query which used to
- private int multiPassLockToken = 0;
- private VersionController versionController;
- private ReentrantLock mergeDeleteLock = new ReentrantLock();
-
- /**
- * This is the modification file of the result of the current merge.
- */
- private ModificationFile mergingModification;
-
- private TsFileIOWriter mergeFileWriter = null;
- private String mergeOutputPath = null;
- private String mergeBaseDir = null;
- private String mergeFileName = null;
- private boolean mergeIsChunkGroupHasData = false;
- private long mergeStartPos;
-
- /**
- * constructor of FileNodeProcessor.
- */
- FileNodeProcessor(String fileNodeDirPath, String processorName)
- throws FileNodeProcessorException {
- super(processorName);
- for (StorageGroupProcessorStatConstants statConstant :
- StorageGroupProcessorStatConstants.values()) {
- statParamsHashMap.put(statConstant.name(), new AtomicLong(0));
- }
- statStorageDeltaName =
- MonitorConstants.STAT_STORAGE_GROUP_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR
- + MonitorConstants.FILE_NODE_PATH + MonitorConstants.MONITOR_PATH_SEPARATOR
- + processorName.replaceAll("\\.", "_");
-
- this.parameters = new HashMap<>();
- String dirPath = fileNodeDirPath;
- if (dirPath.length() > 0
- && dirPath.charAt(dirPath.length() - 1) != File.separatorChar) {
- dirPath = dirPath + File.separatorChar;
- }
-
- File restoreFolder = new File(dirPath + processorName);
- if (!restoreFolder.exists()) {
- restoreFolder.mkdirs();
- LOGGER.info(
- "The restore directory of the filenode processor {} doesn't exist. Create new " +
- "directory {}",
- getProcessorName(), restoreFolder.getAbsolutePath());
- }
- fileNodeRestoreFilePath = new File(restoreFolder, processorName + RESTORE_FILE_SUFFIX)
- .getPath();
- try {
- fileNodeProcessorStore = readStoreFromDisk();
- } catch (FileNodeProcessorException e) {
- LOGGER.error(
- "The fileNode processor {} encountered an error when recoverying restore " +
- "information.", processorName);
- throw new FileNodeProcessorException(e);
- }
- // TODO deep clone the lastupdate time
- lastUpdateTimeMap = fileNodeProcessorStore.getLastUpdateTimeMap();
- emptyTsFileResource = fileNodeProcessorStore.getEmptyTsFileResource();
- newFileNodes = fileNodeProcessorStore.getNewFileNodes();
- isMerging = fileNodeProcessorStore.getFileNodeProcessorStatus();
- numOfMergeFile = fileNodeProcessorStore.getNumOfMergeFile();
- invertedIndexOfFiles = new HashMap<>();
- // deep clone
- flushLastUpdateTimeMap = new HashMap<>();
- for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
- flushLastUpdateTimeMap.put(entry.getKey(), entry.getValue() + 1);
- }
- // construct the fileschema
- try {
- this.fileSchema = constructFileSchema(processorName);
- } catch (WriteProcessException e) {
- throw new FileNodeProcessorException(e);
- }
- // status is not NONE, or the last intervalFile is not closed
- if (isMerging != FileNodeProcessorStatus.NONE
- || (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 1).isClosed())) {
- shouldRecovery = true;
- } else {
- // add file into the index of file
- addAllFileIntoIndex(newFileNodes);
- }
- // RegistStatService
- if (TsFileDBConf.isEnableStatMonitor()) {
- StatMonitor statMonitor = StatMonitor.getInstance();
- registerStatMetadata();
- statMonitor.registerStatistics(statStorageDeltaName, this);
- }
- try {
- versionController = new SimpleFileVersionController(restoreFolder.getPath());
- } catch (IOException e) {
- throw new FileNodeProcessorException(e);
- }
- }
-
- @Override
- public Map<String, AtomicLong> getStatParamsHashMap() {
- return statParamsHashMap;
- }
-
- @Override
- public void registerStatMetadata() {
- Map<String, String> hashMap = new HashMap<>();
- for (StorageGroupProcessorStatConstants statConstant :
- StorageGroupProcessorStatConstants.values()) {
- hashMap
- .put(statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPARATOR + statConstant.name(),
- MonitorConstants.DATA_TYPE_INT64);
- }
- StatMonitor.getInstance().registerStatStorageGroup(hashMap);
- }
-
- @Override
- public List<String> getAllPathForStatistic() {
- List<String> list = new ArrayList<>();
- for (StorageGroupProcessorStatConstants statConstant :
- StorageGroupProcessorStatConstants.values()) {
- list.add(
- statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPARATOR + statConstant.name());
- }
- return list;
- }
-
- @Override
- public Map<String, TSRecord> getAllStatisticsValue() {
- Long curTime = System.currentTimeMillis();
- HashMap<String, TSRecord> tsRecordHashMap = new HashMap<>();
- TSRecord tsRecord = new TSRecord(curTime, statStorageDeltaName);
-
- Map<String, AtomicLong> hashMap = getStatParamsHashMap();
- tsRecord.dataPointList = new ArrayList<>();
- for (Map.Entry<String, AtomicLong> entry : hashMap.entrySet()) {
- tsRecord.dataPointList.add(new LongDataPoint(entry.getKey(), entry.getValue().get()));
- }
-
- tsRecordHashMap.put(statStorageDeltaName, tsRecord);
- return tsRecordHashMap;
- }
-
- /**
- * add interval FileNode.
- */
- void addIntervalFileNode(File file) throws ActionException, IOException {
-
- TsFileResource tsFileResource = new TsFileResource(file, false);
- this.currentTsFileResource = tsFileResource;
- newFileNodes.add(tsFileResource);
- fileNodeProcessorStore.setNewFileNodes(newFileNodes);
- flushFileNodeProcessorAction.act();
- }
-
- /**
- * set interval filenode start time.
- *
- * @param deviceId device ID
- */
- void setIntervalFileNodeStartTime(String deviceId) {
- if (currentTsFileResource.getStartTime(deviceId) == -1) {
- currentTsFileResource.setStartTime(deviceId, flushLastUpdateTimeMap.get(deviceId));
- if (!invertedIndexOfFiles.containsKey(deviceId)) {
- invertedIndexOfFiles.put(deviceId, new ArrayList<>());
- }
- invertedIndexOfFiles.get(deviceId).add(currentTsFileResource);
- }
- }
-
- void setIntervalFileNodeStartTime(String deviceId, long time) {
- if (time != -1) {
- currentTsFileResource.setStartTime(deviceId, time);
- } else {
- currentTsFileResource.removeTime(deviceId);
- invertedIndexOfFiles.get(deviceId).remove(currentTsFileResource);
- }
- }
-
- long getIntervalFileNodeStartTime(String deviceId) {
- return currentTsFileResource.getStartTime(deviceId);
- }
-
- private void addAllFileIntoIndex(List<TsFileResource> fileList) {
- // clear map
- invertedIndexOfFiles.clear();
- // add all file to index
- for (TsFileResource fileNode : fileList) {
- if (fileNode.getStartTimeMap().isEmpty()) {
- continue;
- }
- for (String deviceId : fileNode.getStartTimeMap().keySet()) {
- if (!invertedIndexOfFiles.containsKey(deviceId)) {
- invertedIndexOfFiles.put(deviceId, new ArrayList<>());
- }
- invertedIndexOfFiles.get(deviceId).add(fileNode);
- }
- }
- }
-
- public boolean shouldRecovery() {
- return shouldRecovery;
- }
-
- public boolean isOverflowed() {
- return isOverflowed;
- }
-
- /**
- * if overflow insert, update and delete write into this filenode processor, set
- * <code>isOverflowed</code> to true.
- */
- public void setOverflowed(boolean isOverflowed) {
- if (this.isOverflowed != isOverflowed) {
- this.isOverflowed = isOverflowed;
- }
- }
-
- public FileNodeProcessorStatus getFileNodeProcessorStatus() {
- return isMerging;
- }
-
- /**
- * execute filenode recovery.
- */
- public void fileNodeRecovery() throws FileNodeProcessorException {
- // restore bufferwrite
- if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 1).isClosed()) {
- //
- // add the current file
- //
- currentTsFileResource = newFileNodes.get(newFileNodes.size() - 1);
-
- // this bufferwrite file is not close by normal operation
- String damagedFilePath = newFileNodes.get(newFileNodes.size() - 1).getFile().getAbsolutePath();
- String[] fileNames = damagedFilePath.split("\\" + File.separator);
- // all information to recovery the damaged file.
- // contains file seriesPath, action parameters and processorName
- parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bufferwriteFlushAction);
- parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bufferwriteCloseAction);
- parameters
- .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
- String baseDir = directories
- .getTsFileFolder(newFileNodes.get(newFileNodes.size() - 1).getBaseDirIndex());
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info(
- "The filenode processor {} will recovery the bufferwrite processor, "
- + "the bufferwrite file is {}",
- getProcessorName(), fileNames[fileNames.length - 1]);
- }
-
- try {
- bufferWriteProcessor = new BufferWriteProcessor(baseDir, getProcessorName(),
- fileNames[fileNames.length - 1], parameters, versionController, fileSchema);
- } catch (BufferWriteProcessorException e) {
- LOGGER.error(
- "The filenode processor {} failed to recovery the bufferwrite processor, "
- + "the last bufferwrite file is {}.",
- getProcessorName(), fileNames[fileNames.length - 1]);
- throw new FileNodeProcessorException(e);
- }
- }
- // restore the overflow processor
- LOGGER.info("The filenode processor {} will recovery the overflow processor.",
- getProcessorName());
- try {
- overflowProcessor1 = new OverflowProcessor(getProcessorName(), overflowFlushAction,
- flushFileNodeProcessorAction, ()->{}, versionController, fileSchema);
- } catch (TsFileProcessorException | IOException e) {
- LOGGER.error("The filenode processor {} failed to recovery the overflow processor.",
- getProcessorName());
- throw new FileNodeProcessorException(e);
- }
-
- shouldRecovery = false;
-
- if (isMerging == FileNodeProcessorStatus.MERGING_WRITE) {
- // re-merge all file
- // if bufferwrite processor is not null, and close
- LOGGER.info("The filenode processor {} is recovering, the filenode status is {}.",
- getProcessorName(), isMerging);
- merge();
- } else if (isMerging == FileNodeProcessorStatus.WAITING) {
- // unlock
- LOGGER.info("The filenode processor {} is recovering, the filenode status is {}.",
- getProcessorName(), isMerging);
- //writeUnlock();
- switchWaitingToWorking();
- } else {
- //writeUnlock();
- }
- // add file into index of file
- addAllFileIntoIndex(newFileNodes);
- }
-
- /**
- * get buffer write processor by processor name and insert time.
- */
- public BufferWriteProcessor getBufferWriteProcessor(String processorName, long insertTime)
- throws FileNodeProcessorException {
- if (bufferWriteProcessor == null) {
- Map<String, Action> params = new HashMap<>();
- params.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bufferwriteFlushAction);
- params.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bufferwriteCloseAction);
- params
- .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
- String baseDir = directories.getNextFolderForTsfile();
- LOGGER.info("Allocate folder {} for the new bufferwrite processor.", baseDir);
- // construct processor or restore
- try {
- bufferWriteProcessor = new BufferWriteProcessor(baseDir, processorName,
- insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
- + System.currentTimeMillis(),
- params, versionController, fileSchema);
- } catch (BufferWriteProcessorException e) {
- throw new FileNodeProcessorException(String
- .format("The filenode processor %s failed to get the bufferwrite processor.",
- processorName), e);
- }
- } else if (bufferWriteProcessor.isClosed()) {
- try {
- bufferWriteProcessor.reopen(insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
- + System.currentTimeMillis());
- } catch (BufferWriteProcessorException e) {
- throw new FileNodeProcessorException("Cannot reopen BufferWriteProcessor", e);
- }
- }
- return bufferWriteProcessor;
- }
-
- /**
- * get overflow processor by processor name.
- */
- public OverflowProcessor getOverflowProcessor(String processorName)
- throws IOException, TsFileProcessorException {
- if (overflowProcessor1 == null) {
- Map<String, Action> params = new HashMap<>();
- // construct processor or restore
- params.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
- params
- .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
- overflowProcessor1 = new OverflowProcessor(getProcessorName(), overflowFlushAction,
- flushFileNodeProcessorAction, ()->{}, versionController, fileSchema);
- } else if (overflowProcessor1.isClosed()) {
- overflowProcessor1.reopen();
- }
- return overflowProcessor1;
- }
-
- /**
- * get overflow processor.
- */
- public OverflowProcessor getOverflowProcessor() {
- if (overflowProcessor == null || overflowProcessor.isClosed()) {
- LOGGER.error("The overflow processor is null when getting the overflowProcessor");
- }
- return overflowProcessor;
- }
-
- public boolean hasOverflowProcessor() {
- return overflowProcessor != null && !overflowProcessor.isClosed();
- }
-
- public void setBufferwriteProcessroToClosed() {
-
- bufferWriteProcessor = null;
- }
-
- public boolean hasBufferwriteProcessor() {
-
- return bufferWriteProcessor != null;
- }
-
- /**
- * set last update time.
- */
- public void setLastUpdateTime(String deviceId, long timestamp) {
- if (!lastUpdateTimeMap.containsKey(deviceId) || lastUpdateTimeMap.get(deviceId) < timestamp) {
- lastUpdateTimeMap.put(deviceId, timestamp);
- }
- if (timestamp == -1) {
- lastUpdateTimeMap.remove(deviceId);
- }
- }
-
- /**
- * get last update time.
- */
- public long getLastUpdateTime(String deviceId) {
-
- if (lastUpdateTimeMap.containsKey(deviceId)) {
- return lastUpdateTimeMap.get(deviceId);
- } else {
- return -1;
- }
- }
-
- /**
- * get flush last update time.
- */
- public long getFlushLastUpdateTime(String deviceId) {
- if (!flushLastUpdateTimeMap.containsKey(deviceId)) {
- flushLastUpdateTimeMap.put(deviceId, 0L);
- }
- return flushLastUpdateTimeMap.get(deviceId);
- }
-
- public Map<String, Long> getLastUpdateTimeMap() {
- return lastUpdateTimeMap;
- }
-
- /**
- * For insert overflow.
- */
- public void changeTypeToChanged(String deviceId, long timestamp) {
- if (!invertedIndexOfFiles.containsKey(deviceId)) {
- LOGGER.warn(
- WARN_NO_SUCH_OVERFLOWED_FILE
- + "the data is [device:{},time:{}]",
- getProcessorName(), deviceId, timestamp);
- emptyTsFileResource.setStartTime(deviceId, 0L);
- emptyTsFileResource.setEndTime(deviceId, getLastUpdateTime(deviceId));
- emptyTsFileResource.changeTypeToChanged(isMerging);
- } else {
- List<TsFileResource> temp = invertedIndexOfFiles.get(deviceId);
- int index = searchIndexNodeByTimestamp(deviceId, timestamp, temp);
- changeTypeToChanged(temp.get(index), deviceId);
- }
- }
-
- private void changeTypeToChanged(TsFileResource fileNode, String deviceId) {
- fileNode.changeTypeToChanged(isMerging);
- if (isMerging == FileNodeProcessorStatus.MERGING_WRITE) {
- fileNode.addMergeChanged(deviceId);
- }
- }
-
- /**
- * For update overflow.
- */
- public void changeTypeToChanged(String deviceId, long startTime, long endTime) {
- if (!invertedIndexOfFiles.containsKey(deviceId)) {
- LOGGER.warn(
- WARN_NO_SUCH_OVERFLOWED_FILE
- + "the data is [device:{}, start time:{}, end time:{}]",
- getProcessorName(), deviceId, startTime, endTime);
- emptyTsFileResource.setStartTime(deviceId, 0L);
- emptyTsFileResource.setEndTime(deviceId, getLastUpdateTime(deviceId));
- emptyTsFileResource.changeTypeToChanged(isMerging);
- } else {
- List<TsFileResource> temp = invertedIndexOfFiles.get(deviceId);
- int left = searchIndexNodeByTimestamp(deviceId, startTime, temp);
- int right = searchIndexNodeByTimestamp(deviceId, endTime, temp);
- for (int i = left; i <= right; i++) {
- changeTypeToChanged(temp.get(i), deviceId);
- }
- }
- }
-
- /**
- * For delete overflow.
- */
- public void changeTypeToChangedForDelete(String deviceId, long timestamp) {
- if (!invertedIndexOfFiles.containsKey(deviceId)) {
- LOGGER.warn(
- WARN_NO_SUCH_OVERFLOWED_FILE
- + "the data is [device:{}, delete time:{}]",
- getProcessorName(), deviceId, timestamp);
- emptyTsFileResource.setStartTime(deviceId, 0L);
- emptyTsFileResource.setEndTime(deviceId, getLastUpdateTime(deviceId));
- emptyTsFileResource.changeTypeToChanged(isMerging);
- } else {
- List<TsFileResource> temp = invertedIndexOfFiles.get(deviceId);
- int index = searchIndexNodeByTimestamp(deviceId, timestamp, temp);
- for (int i = 0; i <= index; i++) {
- temp.get(i).changeTypeToChanged(isMerging);
- if (isMerging == FileNodeProcessorStatus.MERGING_WRITE) {
- temp.get(i).addMergeChanged(deviceId);
- }
- }
- }
- }
-
- /**
- * Search the index of the interval by the timestamp.
- *
- * @return index of interval
- */
- private int searchIndexNodeByTimestamp(String deviceId, long timestamp,
- List<TsFileResource> fileList) {
- int index = 1;
- while (index < fileList.size()) {
- if (timestamp < fileList.get(index).getStartTime(deviceId)) {
- break;
- } else {
- index++;
- }
- }
- return index - 1;
- }
-
- /**
- * add multiple pass lock.
- */
- public int addMultiPassCount() {
- LOGGER.debug("Add MultiPassCount: read lock newMultiPassCount.");
- newMultiPassCount.incrementAndGet();
- while (newMultiPassTokenSet.contains(multiPassLockToken)) {
- multiPassLockToken++;
- }
- newMultiPassTokenSet.add(multiPassLockToken);
- LOGGER.debug("Add multi token:{}, nsPath:{}.", multiPassLockToken, getProcessorName());
- return multiPassLockToken;
- }
-
- /**
- * decrease multiple pass count. TODO: use the return value or remove it.
- */
- public boolean decreaseMultiPassCount(int token) throws FileNodeProcessorException {
- if (newMultiPassTokenSet.contains(token)) {
- int newMultiPassCountValue = newMultiPassCount.decrementAndGet();
- if (newMultiPassCountValue < 0) {
- throw new FileNodeProcessorException(String
- .format("Remove MultiPassCount error, newMultiPassCount:%d", newMultiPassCountValue));
- }
- newMultiPassTokenSet.remove(token);
- LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, count:{}", token,
- getProcessorName(),
- newMultiPassTokenSet, newMultiPassCount);
- return true;
- } else if (oldMultiPassTokenSet != null && oldMultiPassTokenSet.contains(token)) {
- // remove token first, then unlock
- oldMultiPassTokenSet.remove(token);
- oldMultiPassCount.countDown();
- long oldMultiPassCountValue = oldMultiPassCount.getCount();
- if (oldMultiPassCountValue < 0) {
- throw new FileNodeProcessorException(String
- .format("Remove MultiPassCount error, oldMultiPassCount:%d", oldMultiPassCountValue));
- }
- LOGGER.debug("Remove multi token:{}, old set:{}, count:{}", token, oldMultiPassTokenSet,
- oldMultiPassCount.getCount());
- return true;
- } else {
- LOGGER.error("remove token error:{},new set:{}, old set:{}", token, newMultiPassTokenSet,
- oldMultiPassTokenSet);
- // should add throw exception
- return false;
- }
- }
-
- /**
- * query data.
- */
- public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId,
- QueryContext context) throws FileNodeProcessorException {
- // query overflow data
- MeasurementSchema mSchema;
- TSDataType dataType;
-
- //mSchema = mManager.getSchemaForOnePath(deviceId + "." + measurementId);
- mSchema = fileSchema.getMeasurementSchema(measurementId);
- dataType = mSchema.getType();
-
- OverflowSeriesDataSource overflowSeriesDataSource;
- try {
- overflowSeriesDataSource = overflowProcessor.query(deviceId, measurementId, dataType,
- mSchema.getProps(), context);
- } catch (IOException e) {
- throw new FileNodeProcessorException(e);
- }
- // tsfile dataØØ
- List<TsFileResource> bufferwriteDataInFiles = new ArrayList<>();
- for (TsFileResource tsFileResource : newFileNodes) {
- // add the same tsFileResource, but not the same reference
- if (tsFileResource.isClosed()) {
- bufferwriteDataInFiles.add(tsFileResource.backUp());
- }
- }
- Pair<ReadOnlyMemChunk, List<ChunkMetaData>> bufferwritedata = new Pair<>(null, null);
- // bufferwrite data
- UnsealedTsFile unsealedTsFile = null;
-
- if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 1).isClosed()
- && !newFileNodes.get(newFileNodes.size() - 1).getStartTimeMap().isEmpty()) {
- unsealedTsFile = new UnsealedTsFile();
- unsealedTsFile.setFilePath(newFileNodes.get(newFileNodes.size() - 1).getFile().getAbsolutePath());
- if (bufferWriteProcessor == null) {
- throw new FileNodeProcessorException(String.format(
- "The last of tsfile %s in filenode processor %s is not closed, "
- + "but the bufferwrite processor is null.",
- newFileNodes.get(newFileNodes.size() - 1).getFile().getAbsolutePath(), getProcessorName()));
- }
- try {
- bufferwritedata = bufferWriteProcessor
- .queryBufferWriteData(deviceId, measurementId, dataType, mSchema.getProps());
- } catch (BufferWriteProcessorException e) {
- throw new FileNodeProcessorException(e);
- }
-
- try {
- List<Modification> pathModifications = context.getPathModifications(
- currentTsFileResource.getModFile(), deviceId
- + IoTDBConstant.PATH_SEPARATOR + measurementId
- );
- if (!pathModifications.isEmpty()) {
- QueryUtils.modifyChunkMetaData(bufferwritedata.right, pathModifications);
- }
- } catch (IOException e) {
- throw new FileNodeProcessorException(e);
- }
-
- unsealedTsFile.setTimeSeriesChunkMetaDatas(bufferwritedata.right);
- }
- SeriesDataSource seriesDataSource = new SeriesDataSource(
- new Path(deviceId + "." + measurementId), bufferwriteDataInFiles, unsealedTsFile,
- bufferwritedata.left);
- return new QueryDataSource(seriesDataSource, overflowSeriesDataSource);
-
- }
-
- /**
- * append one specified tsfile to this filenode processor.
- *
- * @param appendFile the appended tsfile information
- * @param appendFilePath the seriesPath of appended file
- */
- public void appendFile(TsFileResource appendFile, String appendFilePath)
- throws FileNodeProcessorException {
- try {
- if (!appendFile.getFile().getParentFile().exists()) {
- appendFile.getFile().getParentFile().mkdirs();
- }
- // move file
- File originFile = new File(appendFilePath);
- File targetFile = appendFile.getFile();
- if (!originFile.exists()) {
- throw new FileNodeProcessorException(
- String.format("The appended file %s does not exist.", appendFilePath));
- }
- if (targetFile.exists()) {
- throw new FileNodeProcessorException(
- String.format("The appended target file %s already exists.",
- appendFile.getFile().getAbsolutePath()));
- }
- if (!originFile.renameTo(targetFile)) {
- LOGGER.warn("File renaming failed when appending new file. Origin: {}, Target: {}",
- originFile.getPath(), targetFile.getPath());
- }
- // append the new tsfile
- this.newFileNodes.add(appendFile);
- // update the lastUpdateTime
- for (Entry<String, Long> entry : appendFile.getEndTimeMap().entrySet()) {
- lastUpdateTimeMap.put(entry.getKey(), entry.getValue());
- }
- bufferwriteFlushAction.act();
- fileNodeProcessorStore.setNewFileNodes(newFileNodes);
- // reconstruct the inverted index of the newFileNodes
- flushFileNodeProcessorAction.act();
- addAllFileIntoIndex(newFileNodes);
- } catch (Exception e) {
- LOGGER.error("Failed to append the tsfile {} to filenode processor {}.", appendFile,
- getProcessorName());
- throw new FileNodeProcessorException(e);
- }
- }
-
- /**
- * get overlap tsfiles which are conflict with the appendFile.
- *
- * @param appendFile the appended tsfile information
- */
- public List<String> getOverlapFiles(TsFileResource appendFile, String uuid)
- throws FileNodeProcessorException {
- List<String> overlapFiles = new ArrayList<>();
- try {
- for (TsFileResource tsFileResource : newFileNodes) {
- getOverlapFiles(appendFile, tsFileResource, uuid, overlapFiles);
- }
- } catch (IOException e) {
- LOGGER.error("Failed to get overlap tsfiles which conflict with the appendFile.");
- throw new FileNodeProcessorException(e);
- }
- return overlapFiles;
- }
-
- private void getOverlapFiles(TsFileResource appendFile, TsFileResource tsFileResource,
- String uuid, List<String> overlapFiles) throws IOException {
- for (Entry<String, Long> entry : appendFile.getStartTimeMap().entrySet()) {
- if (tsFileResource.getStartTimeMap().containsKey(entry.getKey()) &&
- tsFileResource.getEndTime(entry.getKey()) >= entry.getValue()
- && tsFileResource.getStartTime(entry.getKey()) <= appendFile
- .getEndTime(entry.getKey())) {
- String relativeFilePath =
- Constans.SYNC_SERVER + File.separatorChar + uuid + File.separatorChar
- + Constans.BACK_UP_DIRECTORY_NAME
- + File.separatorChar + tsFileResource.getRelativePath();
- File newFile = new File(
- Directories.getInstance().getTsFileFolder(tsFileResource.getBaseDirIndex()),
- relativeFilePath);
- if (!newFile.getParentFile().exists()) {
- newFile.getParentFile().mkdirs();
- }
- java.nio.file.Path link = FileSystems.getDefault().getPath(newFile.getPath());
- java.nio.file.Path target = FileSystems.getDefault()
- .getPath(tsFileResource.getFile().getAbsolutePath());
- Files.createLink(link, target);
- overlapFiles.add(newFile.getPath());
- break;
- }
- }
- }
-
- /**
- * add time series.
- */
- public void addTimeSeries(String measurementId, TSDataType dataType, TSEncoding encoding,
- CompressionType compressor, Map<String, String> props) {
- fileSchema.registerMeasurement(new MeasurementSchema(measurementId, dataType, encoding,
- compressor, props));
- }
-
- /**
- * submit the merge task to the <code>MergePool</code>.
- *
- * @return null -can't submit the merge task, because this filenode is not overflowed or it is
- * merging now. Future - submit the merge task successfully.
- */
- Future submitToMerge() {
- ZoneId zoneId = IoTDBDescriptor.getInstance().getConfig().getZoneID();
- if (lastMergeTime > 0) {
- long thisMergeTime = System.currentTimeMillis();
- long mergeTimeInterval = thisMergeTime - lastMergeTime;
- ZonedDateTime lastDateTime = ofInstant(Instant.ofEpochMilli(lastMergeTime),
- zoneId);
- ZonedDateTime thisDateTime = ofInstant(Instant.ofEpochMilli(thisMergeTime),
- zoneId);
- LOGGER.info(
- "The filenode {} last merge time is {}, this merge time is {}, "
- + "merge time interval is {}s",
- getProcessorName(), lastDateTime, thisDateTime, mergeTimeInterval / 1000);
- }
- lastMergeTime = System.currentTimeMillis();
-
- if (overflowProcessor != null && !overflowProcessor.isClosed()) {
- if (overflowProcessor.getFileSize() < IoTDBDescriptor.getInstance()
- .getConfig().getOverflowFileSizeThreshold()) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info(
- "Skip this merge taks submission, because the size{} of overflow processor {} "
- + "does not reaches the threshold {}.",
- MemUtils.bytesCntToStr(overflowProcessor.getFileSize()), getProcessorName(),
- MemUtils.bytesCntToStr(
- IoTDBDescriptor.getInstance().getConfig().getOverflowFileSizeThreshold()));
- }
- return null;
- }
- } else {
- LOGGER.info(
- "Skip this merge taks submission, because the filenode processor {} "
- + "has no overflow processor.",
- getProcessorName());
- return null;
- }
- if (isOverflowed && isMerging == FileNodeProcessorStatus.NONE) {
- Runnable mergeThread;
- mergeThread = new MergeRunnale();
- LOGGER.info("Submit the merge task, the merge filenode is {}", getProcessorName());
- return MergeManager.getInstance().submit(mergeThread);
- } else {
- if (!isOverflowed) {
- LOGGER.info(
- "Skip this merge taks submission, because the filenode processor {} is not " +
- "overflowed.",
- getProcessorName());
- } else {
- LOGGER.warn(
- "Skip this merge task submission, because last merge task is not over yet, "
- + "the merge filenode processor is {}",
- getProcessorName());
- }
- }
- return null;
- }
-
- /**
- * Prepare for merge, close the bufferwrite and overflow.
- */
- private void prepareForMerge() {
- try {
- LOGGER.info("The filenode processor {} prepares for merge, closes the bufferwrite processor",
- getProcessorName());
- closeBufferWrite();
- // try to get overflow processor
- getOverflowProcessor(getProcessorName());
- // must close the overflow processor
- while (!getOverflowProcessor().canBeClosed()) {
- waitForClosing();
- }
- LOGGER.info("The filenode processor {} prepares for merge, closes the overflow processor",
- getProcessorName());
- getOverflowProcessor().close();
- } catch (FileNodeProcessorException | OverflowProcessorException | IOException e) {
- LOGGER.error("The filenode processor {} prepares for merge error.", getProcessorName());
- writeUnlock();
- throw new ErrorDebugException(e);
- }
- }
-
- private void waitForClosing() {
- try {
- LOGGER.info(
- "The filenode processor {} prepares for merge, the overflow {} can't be closed, "
- + "wait 100ms,",
- getProcessorName(), getProcessorName());
- TimeUnit.MICROSECONDS.sleep(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * Merge this storage group, merge the tsfile data with overflow data.
- */
- public void merge() throws FileNodeProcessorException {
- // close bufferwrite and overflow, prepare for merge
- LOGGER.info("The filenode processor {} begins to merge.", getProcessorName());
- prepareForMerge();
- // change status from overflowed to no overflowed
- isOverflowed = false;
- // change status from work to merge
- isMerging = FileNodeProcessorStatus.MERGING_WRITE;
- // check the empty file
- Map<String, Long> startTimeMap = emptyTsFileResource.getStartTimeMap();
- mergeCheckEmptyFile(startTimeMap);
-
- for (TsFileResource tsFileResource : newFileNodes) {
- if (tsFileResource.getOverflowChangeType() != OverflowChangeType.NO_CHANGE) {
- tsFileResource.setOverflowChangeType(OverflowChangeType.CHANGED);
- }
- }
-
- addAllFileIntoIndex(newFileNodes);
- synchronized (fileNodeProcessorStore) {
- fileNodeProcessorStore.setOverflowed(isOverflowed);
- fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging);
- fileNodeProcessorStore.setNewFileNodes(newFileNodes);
- fileNodeProcessorStore.setEmptyTsFileResource(emptyTsFileResource);
- // flush this filenode information
- try {
- writeStoreToDisk(fileNodeProcessorStore);
- } catch (FileNodeProcessorException e) {
- LOGGER.error("The filenode processor {} writes restore information error when merging.",
- getProcessorName(), e);
- writeUnlock();
- throw new FileNodeProcessorException(e);
- }
- }
- // add numOfMergeFile to control the number of the merge file
- List<TsFileResource> backupIntervalFiles;
-
- backupIntervalFiles = switchFileNodeToMerge();
- //
- // clear empty file
- //
- boolean needEmtpy = false;
- if (emptyTsFileResource.getOverflowChangeType() != OverflowChangeType.NO_CHANGE) {
- needEmtpy = true;
- }
- emptyTsFileResource.clear();
- // attention
- try {
- if (overflowProcessor.isClosed()) {
- overflowProcessor.reopen();
- }
- overflowProcessor.switchWorkToMerge();
- } catch (IOException e) {
- LOGGER.error("The filenode processor {} can't switch overflow processor from work to merge.",
- getProcessorName(), e);
- writeUnlock();
- throw new FileNodeProcessorException(e);
- }
- LOGGER.info("The filenode processor {} switches from {} to {}.", getProcessorName(),
- FileNodeProcessorStatus.NONE, FileNodeProcessorStatus.MERGING_WRITE);
- writeUnlock();
-
- // query tsfile data and overflow data, and merge them
- int numOfMergeFiles = 0;
- int allNeedMergeFiles = backupIntervalFiles.size();
- for (TsFileResource backupIntervalFile : backupIntervalFiles) {
- numOfMergeFiles++;
- if (backupIntervalFile.getOverflowChangeType() == OverflowChangeType.CHANGED) {
- // query data and merge
- String filePathBeforeMerge = backupIntervalFile.getRelativePath();
- try {
- LOGGER.info(
- "The filenode processor {} begins merging the {}/{} tsfile[{}] with "
- + "overflow file, the process is {}%",
- getProcessorName(), numOfMergeFiles, allNeedMergeFiles, filePathBeforeMerge,
- (int) (((numOfMergeFiles - 1) / (float) allNeedMergeFiles) * 100));
- long startTime = System.currentTimeMillis();
- String newFile = queryAndWriteDataForMerge(backupIntervalFile);
- long endTime = System.currentTimeMillis();
- long timeConsume = endTime - startTime;
- ZoneId zoneId = IoTDBDescriptor.getInstance().getConfig().getZoneID();
- LOGGER.info(
- "The fileNode processor {} has merged the {}/{} tsfile[{}->{}] over, "
- + "start time of merge is {}, end time of merge is {}, "
- + "time consumption is {}ms,"
- + " the process is {}%",
- getProcessorName(), numOfMergeFiles, allNeedMergeFiles, filePathBeforeMerge,
- newFile, ofInstant(Instant.ofEpochMilli(startTime),
- zoneId), ofInstant(Instant.ofEpochMilli(endTime), zoneId), timeConsume,
- numOfMergeFiles / (float) allNeedMergeFiles * 100);
- } catch (IOException | PathErrorException e) {
- LOGGER.error("Merge: query and write data error.", e);
- throw new FileNodeProcessorException(e);
- }
- } else if (backupIntervalFile.getOverflowChangeType() == OverflowChangeType.MERGING_CHANGE) {
- LOGGER.error("The overflowChangeType of backupIntervalFile must not be {}",
- OverflowChangeType.MERGING_CHANGE);
- // handle this error, throw one runtime exception
- throw new FileNodeProcessorException(
- "The overflowChangeType of backupIntervalFile must not be "
- + OverflowChangeType.MERGING_CHANGE);
- } else {
- LOGGER.debug(
- "The filenode processor {} is merging, the interval file {} doesn't "
- + "need to be merged.",
- getProcessorName(), backupIntervalFile.getRelativePath());
- }
- }
-
- // change status from merge to wait
- switchMergeToWaiting(backupIntervalFiles, needEmtpy);
-
- // change status from wait to work
- switchWaitingToWorking();
- }
-
- private void mergeCheckEmptyFile(Map<String, Long> startTimeMap) {
- if (emptyTsFileResource.getOverflowChangeType() == OverflowChangeType.NO_CHANGE) {
- return;
- }
- Iterator<Entry<String, Long>> iterator = emptyTsFileResource.getEndTimeMap().entrySet()
- .iterator();
- while (iterator.hasNext()) {
- Entry<String, Long> entry = iterator.next();
- String deviceId = entry.getKey();
- if (invertedIndexOfFiles.containsKey(deviceId)) {
- invertedIndexOfFiles.get(deviceId).get(0).setOverflowChangeType(OverflowChangeType.CHANGED);
- startTimeMap.remove(deviceId);
- iterator.remove();
- }
- }
- if (emptyTsFileResource.checkEmpty()) {
- emptyTsFileResource.clear();
- } else {
- if (!newFileNodes.isEmpty()) {
- TsFileResource first = newFileNodes.get(0);
- for (String deviceId : emptyTsFileResource.getStartTimeMap().keySet()) {
- first.setStartTime(deviceId, emptyTsFileResource.getStartTime(deviceId));
- first.setEndTime(deviceId, emptyTsFileResource.getEndTime(deviceId));
- first.setOverflowChangeType(OverflowChangeType.CHANGED);
- }
- emptyTsFileResource.clear();
- } else {
- emptyTsFileResource.setOverflowChangeType(OverflowChangeType.CHANGED);
- }
- }
- }
-
- private List<TsFileResource> switchFileNodeToMerge() throws FileNodeProcessorException {
- List<TsFileResource> result = new ArrayList<>();
- if (emptyTsFileResource.getOverflowChangeType() != OverflowChangeType.NO_CHANGE) {
- // add empty
- result.add(emptyTsFileResource.backUp());
- if (!newFileNodes.isEmpty()) {
- throw new FileNodeProcessorException(
- String.format("The status of empty file is %s, but the new file list is not empty",
- emptyTsFileResource.getOverflowChangeType()));
- }
- return result;
- }
- if (newFileNodes.isEmpty()) {
- LOGGER.error("No file was changed when merging, the filenode is {}", getProcessorName());
- throw new FileNodeProcessorException(
- "No file was changed when merging, the filenode is " + getProcessorName());
- }
- for (TsFileResource tsFileResource : newFileNodes) {
- updateFileNode(tsFileResource, result);
- }
- return result;
- }
-
- private void updateFileNode(TsFileResource tsFileResource, List<TsFileResource> result) {
- if (tsFileResource.getOverflowChangeType() == OverflowChangeType.NO_CHANGE) {
- result.add(tsFileResource.backUp());
- } else {
- Map<String, Long> startTimeMap = new HashMap<>();
- Map<String, Long> endTimeMap = new HashMap<>();
- for (String deviceId : tsFileResource.getEndTimeMap().keySet()) {
- List<TsFileResource> temp = invertedIndexOfFiles.get(deviceId);
- int index = temp.indexOf(tsFileResource);
- int size = temp.size();
- // start time
- if (index == 0) {
- startTimeMap.put(deviceId, 0L);
- } else {
- startTimeMap.put(deviceId, tsFileResource.getStartTime(deviceId));
- }
- // end time
- if (index < size - 1) {
- endTimeMap.put(deviceId, temp.get(index + 1).getStartTime(deviceId) - 1);
- } else {
- endTimeMap.put(deviceId, tsFileResource.getEndTime(deviceId));
- }
- }
- TsFileResource node = new TsFileResource(startTimeMap, endTimeMap, tsFileResource.getFile());
- result.add(node);
- }
- }
-
- private void switchMergeToWaiting(List<TsFileResource> backupIntervalFiles, boolean needEmpty)
- throws FileNodeProcessorException {
- LOGGER.info("The status of filenode processor {} switches from {} to {}.", getProcessorName(),
- FileNodeProcessorStatus.MERGING_WRITE, FileNodeProcessorStatus.WAITING);
- writeLock();
- try {
- oldMultiPassTokenSet = newMultiPassTokenSet;
- oldMultiPassCount = new CountDownLatch(newMultiPassCount.get());
- newMultiPassTokenSet = new HashSet<>();
- newMultiPassCount = new AtomicInteger(0);
- List<TsFileResource> result = new ArrayList<>();
- int beginIndex = 0;
- if (needEmpty) {
- TsFileResource empty = backupIntervalFiles.get(0);
- if (!empty.checkEmpty()) {
- updateEmpty(empty, result);
- beginIndex++;
- }
- }
- // reconstruct the file index
- addAllFileIntoIndex(backupIntervalFiles);
- // check the merge changed file
- for (int i = beginIndex; i < backupIntervalFiles.size(); i++) {
- TsFileResource newFile = newFileNodes.get(i - beginIndex);
- TsFileResource temp = backupIntervalFiles.get(i);
- if (newFile.getOverflowChangeType() == OverflowChangeType.MERGING_CHANGE) {
- updateMergeChanged(newFile, temp);
- }
- if (!temp.checkEmpty()) {
- result.add(temp);
- }
- }
- // add new file when merge
- for (int i = backupIntervalFiles.size() - beginIndex; i < newFileNodes.size(); i++) {
- TsFileResource fileNode = newFileNodes.get(i);
- if (fileNode.isClosed()) {
- result.add(fileNode.backUp());
- } else {
- result.add(fileNode);
- }
- }
-
- isMerging = FileNodeProcessorStatus.WAITING;
- newFileNodes = result;
- // reconstruct the index
- addAllFileIntoIndex(newFileNodes);
- // clear merge changed
- for (TsFileResource fileNode : newFileNodes) {
- fileNode.clearMergeChanged();
- }
-
- synchronized (fileNodeProcessorStore) {
- fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging);
- fileNodeProcessorStore.setEmptyTsFileResource(emptyTsFileResource);
- fileNodeProcessorStore.setNewFileNodes(newFileNodes);
- try {
- writeStoreToDisk(fileNodeProcessorStore);
- } catch (FileNodeProcessorException e) {
- LOGGER.error(
- "Merge: failed to write filenode information to revocery file, the filenode is " +
- "{}.",
- getProcessorName(), e);
- throw new FileNodeProcessorException(
- "Merge: write filenode information to revocery file failed, the filenode is "
- + getProcessorName());
- }
- }
- } finally {
- writeUnlock();
- }
- }
-
- private void updateEmpty(TsFileResource empty, List<TsFileResource> result) {
- for (String deviceId : empty.getStartTimeMap().keySet()) {
- if (invertedIndexOfFiles.containsKey(deviceId)) {
- TsFileResource temp = invertedIndexOfFiles.get(deviceId).get(0);
- if (temp.getMergeChanged().contains(deviceId)) {
- empty.setOverflowChangeType(OverflowChangeType.CHANGED);
- break;
- }
- }
- }
- empty.clearMergeChanged();
- result.add(empty.backUp());
- }
-
- private void updateMergeChanged(TsFileResource newFile, TsFileResource temp) {
- for (String deviceId : newFile.getMergeChanged()) {
- if (temp.getStartTimeMap().containsKey(deviceId)) {
- temp.setOverflowChangeType(OverflowChangeType.CHANGED);
- } else {
- changeTypeToChanged(deviceId, newFile.getStartTime(deviceId),
- newFile.getEndTime(deviceId));
- }
- }
- }
-
-
- private void switchWaitingToWorking()
- throws FileNodeProcessorException {
-
- LOGGER.info("The status of filenode processor {} switches from {} to {}.", getProcessorName(),
- FileNodeProcessorStatus.WAITING, FileNodeProcessorStatus.NONE);
-
- if (oldMultiPassCount != null) {
- LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple Pass Count is {}",
- oldMultiPassTokenSet,
- oldMultiPassCount);
- try {
- oldMultiPassCount.await();
- } catch (InterruptedException e) {
- LOGGER.info(
- "The filenode processor {} encountered an error when it waits for all old queries over.",
- getProcessorName());
- throw new FileNodeProcessorException(e);
- }
- }
-
- try {
- writeLock();
- try {
- // delete the all files which are in the newFileNodes
- // notice: the last restore file of the interval file
-
- List<String> bufferwriteDirPathList = directories.getAllTsFileFolders();
- List<File> bufferwriteDirList = new ArrayList<>();
- collectBufferWriteDirs(bufferwriteDirPathList, bufferwriteDirList);
-
- Set<String> bufferFiles = new HashSet<>();
- collectBufferWriteFiles(bufferFiles);
-
- // add the restore file, if the last file is not closed
- if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 1).isClosed()) {
- String bufferFileRestorePath =
- newFileNodes.get(newFileNodes.size() - 1).getFile().getAbsolutePath() + RESTORE_FILE_SUFFIX;
- bufferFiles.add(bufferFileRestorePath);
- }
-
- deleteBufferWriteFiles(bufferwriteDirList, bufferFiles);
-
- // merge switch
- changeFileNodes();
-
- // overflow switch from merge to work
- overflowProcessor.switchMergeToWork();
-
- // write status to file
- isMerging = FileNodeProcessorStatus.NONE;
- synchronized (fileNodeProcessorStore) {
- fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging);
- fileNodeProcessorStore.setNewFileNodes(newFileNodes);
- fileNodeProcessorStore.setEmptyTsFileResource(emptyTsFileResource);
- writeStoreToDisk(fileNodeProcessorStore);
- }
- } catch (IOException e) {
- LOGGER.info(
- "The filenode processor {} encountered an error when its "
- + "status switched from {} to {}.",
- getProcessorName(), FileNodeProcessorStatus.NONE,
- FileNodeProcessorStatus.MERGING_WRITE);
- throw new FileNodeProcessorException(e);
- } finally {
- writeUnlock();
- }
- } finally {
- oldMultiPassTokenSet = null;
- oldMultiPassCount = null;
- }
-
- }
-
- private void collectBufferWriteDirs(List<String> bufferwriteDirPathList,
- List<File> bufferwriteDirList) {
- for (String bufferwriteDirPath : bufferwriteDirPathList) {
- if (bufferwriteDirPath.length() > 0
- && bufferwriteDirPath.charAt(bufferwriteDirPath.length() - 1)
- != File.separatorChar) {
- bufferwriteDirPath = bufferwriteDirPath + File.separatorChar;
- }
- bufferwriteDirPath = bufferwriteDirPath + getProcessorName();
- File bufferwriteDir = new File(bufferwriteDirPath);
- bufferwriteDirList.add(bufferwriteDir);
- if (!bufferwriteDir.exists()) {
- bufferwriteDir.mkdirs();
- }
- }
- }
-
- private void collectBufferWriteFiles(Set<String> bufferFiles) {
- for (TsFileResource bufferFileNode : newFileNodes) {
- String bufferFilePath = bufferFileNode.getFile().getAbsolutePath();
- if (bufferFilePath != null) {
- bufferFiles.add(bufferFilePath);
- }
- }
- }
-
- private void deleteBufferWriteFiles(List<File> bufferwriteDirList, Set<String> bufferFiles)
- throws IOException {
- for (File bufferwriteDir : bufferwriteDirList) {
- File[] files = bufferwriteDir.listFiles();
- if (files == null) {
- continue;
- }
- for (File file : files) {
- if (!bufferFiles.contains(file.getPath())) {
- FileReaderManager.getInstance().closeFileAndRemoveReader(file.getPath());
- if (!file.delete()) {
- LOGGER.warn("Cannot delete BufferWrite file {}", file.getPath());
- }
- }
- }
- }
- }
-
- private void changeFileNodes() {
- for (TsFileResource fileNode : newFileNodes) {
- if (fileNode.getOverflowChangeType() != OverflowChangeType.NO_CHANGE) {
- fileNode.setOverflowChangeType(OverflowChangeType.CHANGED);
- }
- }
- }
-
- private String queryAndWriteDataForMerge(TsFileResource backupIntervalFile)
- throws IOException, FileNodeProcessorException, PathErrorException {
- Map<String, Long> startTimeMap = new HashMap<>();
- Map<String, Long> endTimeMap = new HashMap<>();
-
- mergeFileWriter = null;
- mergeOutputPath = null;
- mergeBaseDir = null;
- mergeFileName = null;
- // modifications are blocked before mergeModification is created to avoid
- // losing some modification.
- mergeDeleteLock.lock();
- QueryContext context = new QueryContext();
- try {
- FileReaderManager.getInstance().increaseFileReaderReference(backupIntervalFile.getFilePath(),
- true);
- for (String deviceId : backupIntervalFile.getStartTimeMap().keySet()) {
- // query one deviceId
- List<Path> pathList = new ArrayList<>();
- mergeIsChunkGroupHasData = false;
- mergeStartPos = -1;
- ChunkGroupFooter footer;
- int numOfChunk = 0;
- try {
- List<String> pathStrings = mManager.getLeafNodePathInNextLevel(deviceId);
- for (String string : pathStrings) {
- pathList.add(new Path(string));
- }
- } catch (PathErrorException e) {
- LOGGER.error("Can't get all the paths from MManager, the deviceId is {}", deviceId);
- throw new FileNodeProcessorException(e);
- }
- if (pathList.isEmpty()) {
- continue;
- }
- for (Path path : pathList) {
- // query one measurement in the special deviceId
- String measurementId = path.getMeasurement();
- TSDataType dataType = mManager.getSeriesType(path.getFullPath());
- OverflowSeriesDataSource overflowSeriesDataSource = overflowProcessor.queryMerge(deviceId,
- measurementId, dataType, true, context);
- Filter timeFilter = FilterFactory
- .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
- TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
- SingleSeriesExpression seriesFilter = new SingleSeriesExpression(path, timeFilter);
-
- for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource
- .getOverflowInsertFileList()) {
- FileReaderManager.getInstance()
- .increaseFileReaderReference(overflowInsertFile.getFilePath(),
- false);
- }
-
- IReader seriesReader = SeriesReaderFactory.getInstance()
- .createSeriesReaderForMerge(backupIntervalFile,
- overflowSeriesDataSource, seriesFilter, context);
- numOfChunk += queryAndWriteSeries(seriesReader, path, seriesFilter, dataType,
- startTimeMap, endTimeMap, overflowSeriesDataSource);
- }
- if (mergeIsChunkGroupHasData) {
- // end the new rowGroupMetadata
- long size = mergeFileWriter.getPos() - mergeStartPos;
- footer = new ChunkGroupFooter(deviceId, size, numOfChunk);
- mergeFileWriter.endChunkGroup(footer, 0);
- }
- }
- } finally {
- FileReaderManager.getInstance().decreaseFileReaderReference(backupIntervalFile.getFilePath(),
- true);
-
- if (mergeDeleteLock.isLocked()) {
- mergeDeleteLock.unlock();
- }
- }
-
- if (mergeFileWriter != null) {
- mergeFileWriter.endFile(fileSchema);
- }
- backupIntervalFile.setFile(new File(mergeBaseDir + File.separator + mergeFileName));
- backupIntervalFile.setOverflowChangeType(OverflowChangeType.NO_CHANGE);
- backupIntervalFile.setStartTimeMap(startTimeMap);
- backupIntervalFile.setEndTimeMap(endTimeMap);
- backupIntervalFile.setModFile(mergingModification);
- mergingModification = null;
- return mergeFileName;
- }
-
- private int queryAndWriteSeries(IReader seriesReader, Path path,
- SingleSeriesExpression seriesFilter, TSDataType dataType,
- Map<String, Long> startTimeMap, Map<String, Long> endTimeMap,
- OverflowSeriesDataSource overflowSeriesDataSource)
- throws IOException {
- int numOfChunk = 0;
- try {
- if (!seriesReader.hasNext()) {
- LOGGER.debug(
- "The time-series {} has no data with the filter {} in the filenode processor {}",
- path, seriesFilter, getProcessorName());
- } else {
- numOfChunk++;
- TimeValuePair timeValuePair = seriesReader.next();
- if (mergeFileWriter == null) {
- mergeBaseDir = directories.getNextFolderForTsfile();
- mergeFileName = timeValuePair.getTimestamp()
- + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis();
- mergeOutputPath = constructOutputFilePath(mergeBaseDir, getProcessorName(),
- mergeFileName);
- mergeFileName = getProcessorName() + File.separatorChar + mergeFileName;
- mergeFileWriter = new TsFileIOWriter(new File(mergeOutputPath));
- mergingModification = new ModificationFile(mergeOutputPath
- + ModificationFile.FILE_SUFFIX);
- mergeDeleteLock.unlock();
- }
- if (!mergeIsChunkGroupHasData) {
- // start a new rowGroupMetadata
- mergeIsChunkGroupHasData = true;
- // the datasize and numOfChunk is fake
- // the accurate datasize and numOfChunk will get after write all this device data.
- mergeFileWriter.startFlushChunkGroup(path.getDevice());// TODO please check me.
- mergeStartPos = mergeFileWriter.getPos();
- }
- // init the serieswWriteImpl
- MeasurementSchema measurementSchema = fileSchema
- .getMeasurementSchema(path.getMeasurement());
- ChunkBuffer pageWriter = new ChunkBuffer(measurementSchema);
- int pageSizeThreshold = TSFileConfig.pageSizeInByte;
- ChunkWriterImpl seriesWriterImpl = new ChunkWriterImpl(measurementSchema, pageWriter,
- pageSizeThreshold);
- // write the series data
- writeOneSeries(path.getDevice(), seriesWriterImpl, dataType,
- seriesReader,
- startTimeMap, endTimeMap, timeValuePair);
- // flush the series data
- seriesWriterImpl.writeToFileWriter(mergeFileWriter);
- }
- } finally {
- for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource
- .getOverflowInsertFileList()) {
- FileReaderManager.getInstance()
- .decreaseFileReaderReference(overflowInsertFile.getFilePath(),
- false);
- }
- }
- return numOfChunk;
- }
-
-
- private void writeOneSeries(String deviceId, ChunkWriterImpl seriesWriterImpl,
- TSDataType dataType, IReader seriesReader, Map<String, Long> startTimeMap,
- Map<String, Long> endTimeMap, TimeValuePair firstTVPair) throws IOException {
- long startTime;
- long endTime;
- TimeValuePair localTV = firstTVPair;
- writeTVPair(seriesWriterImpl, dataType, localTV);
- startTime = endTime = localTV.getTimestamp();
- if (!startTimeMap.containsKey(deviceId) || startTimeMap.get(deviceId) > startTime) {
- startTimeMap.put(deviceId, startTime);
- }
- if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) < endTime) {
- endTimeMap.put(deviceId, endTime);
- }
- while (seriesReader.hasNext()) {
- localTV = seriesReader.next();
- endTime = localTV.getTimestamp();
- writeTVPair(seriesWriterImpl, dataType, localTV);
- }
- if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) < endTime) {
- endTimeMap.put(deviceId, endTime);
- }
- }
-
- private void writeTVPair(ChunkWriterImpl seriesWriterImpl, TSDataType dataType,
- TimeValuePair timeValuePair) throws IOException {
- switch (dataType) {
- case BOOLEAN:
- seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
- break;
- case INT32:
- seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt());
- break;
- case INT64:
- seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong());
- break;
- case FLOAT:
- seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat());
- break;
- case DOUBLE:
- seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble());
- break;
- case TEXT:
- seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
- break;
- default:
- LOGGER.error("Not support data type: {}", dataType);
- break;
- }
- }
-
-
- private String constructOutputFilePath(String baseDir, String processorName, String fileName) {
-
- String localBaseDir = baseDir;
- if (localBaseDir.charAt(localBaseDir.length() - 1) != File.separatorChar) {
- localBaseDir = localBaseDir + File.separatorChar + processorName;
- }
- File dataDir = new File(localBaseDir);
- if (!dataDir.exists()) {
- LOGGER.warn("The bufferwrite processor data dir doesn't exists, create new directory {}",
- localBaseDir);
- dataDir.mkdirs();
- }
- File outputFile = new File(dataDir, fileName);
- return outputFile.getPath();
- }
-
- private FileSchema constructFileSchema(String processorName) throws WriteProcessException {
-
- List<MeasurementSchema> columnSchemaList;
- columnSchemaList = mManager.getSchemaForFileName(processorName);
-
- FileSchema schema = new FileSchema();
- for (MeasurementSchema measurementSchema : columnSchemaList) {
- schema.registerMeasurement(measurementSchema);
- }
- return schema;
-
- }
-
- @Override
- public boolean canBeClosed() {
- if (isMerging != FileNodeProcessorStatus.NONE) {
- LOGGER.info("The filenode {} can't be closed, because the filenode status is {}",
- getProcessorName(),
- isMerging);
- return false;
- }
- if (newMultiPassCount.get() != 0) {
- LOGGER.warn("The filenode {} can't be closed, because newMultiPassCount is {}. The newMultiPassTokenSet is {}",
- getProcessorName(), newMultiPassCount, newMultiPassTokenSet);
- return false;
- }
-
- if (oldMultiPassCount == null) {
- return true;
- }
- if (oldMultiPassCount.getCount() == 0) {
- return true;
- } else {
- LOGGER.info("The filenode {} can't be closed, because oldMultiPassCount is {}",
- getProcessorName(), oldMultiPassCount.getCount());
- return false;
- }
- }
-
- @Override
- public FileNodeFlushFuture flush() throws IOException {
- Future<Boolean> bufferWriteFlushFuture = null;
- Future<Boolean> overflowFlushFuture = null;
- if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
- bufferWriteFlushFuture = bufferWriteProcessor.flush();
- }
- if (overflowProcessor != null && !overflowProcessor.isClosed()) {
- overflowFlushFuture = overflowProcessor.flush();
- }
- return new FileNodeFlushFuture(bufferWriteFlushFuture, overflowFlushFuture);
- }
-
- /**
- * Close the bufferwrite processor.
- */
- public void closeBufferWrite() throws FileNodeProcessorException {
- if (bufferWriteProcessor == null || bufferWriteProcessor.isClosed()) {
- return;
- }
- try {
- while (!bufferWriteProcessor.canBeClosed()) {
- waitForBufferWriteClose();
- }
- bufferWriteProcessor.close();
- bufferWriteProcessor = null;
- } catch (BufferWriteProcessorException e) {
- throw new FileNodeProcessorException(e);
- }
- }
-
- private void waitForBufferWriteClose() {
- try {
- LOGGER.info("The bufferwrite {} can't be closed, wait 100ms",
- bufferWriteProcessor.getProcessorName());
- TimeUnit.MICROSECONDS.sleep(100);
- } catch (InterruptedException e) {
- LOGGER.error("Unexpected interruption", e);
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * Close the overflow processor.
- */
- public void closeOverflow() throws FileNodeProcessorException {
- if (overflowProcessor == null || overflowProcessor.isClosed()) {
- return;
- }
- try {
- while (!overflowProcessor.canBeClosed()) {
- waitForOverflowClose();
- }
- overflowProcessor.close();
- } catch (OverflowProcessorException e) {
- throw new FileNodeProcessorException(e);
- }
- }
-
- private void waitForOverflowClose() {
- try {
- LOGGER.info("The overflow {} can't be closed, wait 100ms",
- overflowProcessor.getProcessorName());
- TimeUnit.MICROSECONDS.sleep(100);
- } catch (InterruptedException e) {
- LOGGER.error("Unexpected interruption", e);
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- public void close() throws FileNodeProcessorException {
- closeBufferWrite();
- closeOverflow();
- for (TsFileResource fileNode : newFileNodes) {
- if (fileNode.getModFile() != null) {
- try {
- fileNode.getModFile().close();
- } catch (IOException e) {
- throw new FileNodeProcessorException(e);
- }
- }
- }
- }
-
- /**
- * deregister the filenode processor.
- */
- public void delete() throws ProcessorException {
- if (TsFileDBConf.isEnableStatMonitor()) {
- // remove the monitor
- LOGGER.info("Deregister the filenode processor: {} from monitor.", getProcessorName());
- StatMonitor.getInstance().deregisterStatistics(statStorageDeltaName);
- }
- closeBufferWrite();
- closeOverflow();
- for (TsFileResource fileNode : newFileNodes) {
- if (fileNode.getModFile() != null) {
- try {
- fileNode.getModFile().close();
- } catch (IOException e) {
- throw new FileNodeProcessorException(e);
- }
- }
- }
- }
-
- @Override
- public long memoryUsage() {
- long memSize = 0;
- if (bufferWriteProcessor != null) {
- memSize += bufferWriteProcessor.memoryUsage();
- }
- if (overflowProcessor != null) {
- memSize += overflowProcessor.memoryUsage();
- }
- return memSize;
- }
-
- private void writeStoreToDisk(FileNodeProcessorStore fileNodeProcessorStore)
- throws FileNodeProcessorException {
-
- synchronized (fileNodeRestoreLock) {
- try (FileOutputStream fileOutputStream = new FileOutputStream(fileNodeRestoreFilePath)) {
- fileNodeProcessorStore.serialize(fileOutputStream);
- LOGGER.debug("The filenode processor {} writes restore information to the restore file",
- getProcessorName());
- } catch (IOException e) {
- throw new FileNodeProcessorException(e);
- }
- }
- }
-
- private FileNodeProcessorStore readStoreFromDisk() throws FileNodeProcessorException {
-
- synchronized (fileNodeRestoreLock) {
- File restoreFile = new File(fileNodeRestoreFilePath);
- if (!restoreFile.exists() || restoreFile.length() == 0) {
- try {
- return new FileNodeProcessorStore(false, new HashMap<>(),
- new TsFileResource(null, false),
- new ArrayList<>(), FileNodeProcessorStatus.NONE, 0);
- } catch (IOException e) {
- throw new FileNodeProcessorException(e);
- }
- }
- try (FileInputStream inputStream = new FileInputStream(fileNodeRestoreFilePath)) {
- return FileNodeProcessorStore.deSerialize(inputStream);
- } catch (IOException e) {
- LOGGER
- .error("Failed to deserialize the FileNodeRestoreFile {}, {}", fileNodeRestoreFilePath,
- e);
- throw new FileNodeProcessorException(e);
- }
- }
- }
-
- String getFileNodeRestoreFilePath() {
- return fileNodeRestoreFilePath;
- }
-
- /**
- * Delete data whose timestamp <= 'timestamp' and belong to timeseries deviceId.measurementId.
- *
- * @param deviceId the deviceId of the timeseries to be deleted.
- * @param measurementId the measurementId of the timeseries to be deleted.
- * @param timestamp the delete range is (0, timestamp].
- */
- public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
- // TODO: how to avoid partial deletion?
- mergeDeleteLock.lock();
- long version = versionController.nextVersion();
-
- // record what files are updated so we can roll back them in case of exception
- List<ModificationFile> updatedModFiles = new ArrayList<>();
-
- try {
- String fullPath = deviceId +
- IoTDBConstant.PATH_SEPARATOR + measurementId;
- Deletion deletion = new Deletion(fullPath, version, timestamp);
- if (mergingModification != null) {
- mergingModification.write(deletion);
- updatedModFiles.add(mergingModification);
- }
- deleteBufferWriteFiles(deviceId, deletion, updatedModFiles);
- // delete data in memory
- OverflowProcessor ofProcessor = getOverflowProcessor(getProcessorName());
- ofProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
- if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
- bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
- }
- } catch (Exception e) {
- // roll back
- for (ModificationFile modFile : updatedModFiles) {
- modFile.abort();
- }
- throw new IOException(e);
- } finally {
- mergeDeleteLock.unlock();
- }
- }
-
- private void deleteBufferWriteFiles(String deviceId, Deletion deletion,
- List<ModificationFile> updatedModFiles) throws IOException {
- if (currentTsFileResource != null && currentTsFileResource.containsDevice(deviceId)) {
- currentTsFileResource.getModFile().write(deletion);
- updatedModFiles.add(currentTsFileResource.getModFile());
- }
- for (TsFileResource fileNode : newFileNodes) {
- if (fileNode != currentTsFileResource && fileNode.containsDevice(deviceId)
- && fileNode.getStartTime(deviceId) <= deletion.getTimestamp()) {
- fileNode.getModFile().write(deletion);
- updatedModFiles.add(fileNode.getModFile());
- }
- }
- }
-
- /**
- * Similar to delete(), but only deletes data in BufferWrite. Only used by WAL recovery.
- */
- public void deleteBufferWrite(String deviceId, String measurementId, long timestamp)
- throws IOException, BufferWriteProcessorException {
- String fullPath = deviceId +
- IoTDBConstant.PATH_SEPARATOR + measurementId;
- long version = versionController.nextVersion();
- Deletion deletion = new Deletion(fullPath, version, timestamp);
-
- List<ModificationFile> updatedModFiles = new ArrayList<>();
- try {
- deleteBufferWriteFiles(deviceId, deletion, updatedModFiles);
- } catch (IOException e) {
- for (ModificationFile modificationFile : updatedModFiles) {
- modificationFile.abort();
- }
- throw e;
- }
- if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
- try {
- bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
- } catch (BufferWriteProcessorException e) {
- throw new IOException(e);
- }
- }
- }
-
- /**
- * Similar to delete(), but only deletes data in Overflow. Only used by WAL recovery.
- */
- public void deleteOverflow(String deviceId, String measurementId, long timestamp)
- throws IOException {
- long version = versionController.nextVersion();
-
- OverflowProcessor overflowProcessor = getOverflowProcessor(getProcessorName());
- List<ModificationFile> updatedModFiles = new ArrayList<>();
- try {
- overflowProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
- } catch (IOException e) {
- for (ModificationFile modificationFile : updatedModFiles) {
- modificationFile.abort();
- }
- throw e;
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- FileNodeProcessor that = (FileNodeProcessor) o;
- return isOverflowed == that.isOverflowed &&
- numOfMergeFile == that.numOfMergeFile &&
- lastMergeTime == that.lastMergeTime &&
- shouldRecovery == that.shouldRecovery &&
- multiPassLockToken == that.multiPassLockToken &&
- Objects.equals(statStorageDeltaName, that.statStorageDeltaName) &&
- Objects.equals(statParamsHashMap, that.statParamsHashMap) &&
- Objects.equals(lastUpdateTimeMap, that.lastUpdateTimeMap) &&
- Objects.equals(flushLastUpdateTimeMap, that.flushLastUpdateTimeMap) &&
- Objects.equals(invertedIndexOfFiles, that.invertedIndexOfFiles) &&
- Objects.equals(emptyTsFileResource, that.emptyTsFileResource) &&
- Objects.equals(currentTsFileResource, that.currentTsFileResource) &&
- Objects.equals(newFileNodes, that.newFileNodes) &&
- isMerging == that.isMerging &&
- Objects.equals(fileNodeProcessorStore, that.fileNodeProcessorStore) &&
- Objects.equals(fileNodeRestoreFilePath, that.fileNodeRestoreFilePath) &&
- Objects.equals(bufferWriteProcessor, that.bufferWriteProcessor) &&
- Objects.equals(overflowProcessor, that.overflowProcessor) &&
- Objects.equals(oldMultiPassTokenSet, that.oldMultiPassTokenSet) &&
- Objects.equals(newMultiPassTokenSet, that.newMultiPassTokenSet) &&
- Objects.equals(oldMultiPassCount, that.oldMultiPassCount) &&
- Objects.equals(newMultiPassCount, that.newMultiPassCount) &&
- Objects.equals(parameters, that.parameters) &&
- Objects.equals(fileSchema, that.fileSchema) &&
- Objects.equals(flushFileNodeProcessorAction, that.flushFileNodeProcessorAction) &&
- Objects.equals(bufferwriteFlushAction, that.bufferwriteFlushAction) &&
- Objects.equals(bufferwriteCloseAction, that.bufferwriteCloseAction) &&
- Objects.equals(overflowFlushAction, that.overflowFlushAction);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), statStorageDeltaName, statParamsHashMap, isOverflowed,
- lastUpdateTimeMap, flushLastUpdateTimeMap, invertedIndexOfFiles,
- emptyTsFileResource, currentTsFileResource, newFileNodes, isMerging,
- numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath,
- lastMergeTime, bufferWriteProcessor, overflowProcessor, oldMultiPassTokenSet,
- newMultiPassTokenSet, oldMultiPassCount, newMultiPassCount, shouldRecovery, parameters,
- fileSchema, flushFileNodeProcessorAction, bufferwriteFlushAction,
- bufferwriteCloseAction, overflowFlushAction, multiPassLockToken);
- }
-
- public class MergeRunnale implements Runnable {
-
- @Override
- public void run() {
- try {
- ZoneId zoneId = IoTDBDescriptor.getInstance().getConfig().getZoneID();
- long mergeStartTime = System.currentTimeMillis();
- writeLock();
- merge();
- long mergeEndTime = System.currentTimeMillis();
- long intervalTime = mergeEndTime - mergeStartTime;
- LOGGER.info(
- "The filenode processor {} merge start time is {}, "
- + "merge end time is {}, merge consumes {}ms.",
- getProcessorName(), ofInstant(Instant.ofEpochMilli(mergeStartTime),
- zoneId), ofInstant(Instant.ofEpochMilli(mergeEndTime),
- zoneId), intervalTime);
- } catch (FileNodeProcessorException e) {
- LOGGER.error("The filenode processor {} encountered an error when merging.",
- getProcessorName(), e);
- throw new ErrorDebugException(e);
- }
- }
- }
-}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStatus.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStatus.java
deleted file mode 100644
index b3ce1e5..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStatus.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenode;
-
-public enum FileNodeProcessorStatus {
- NONE, MERGING_WRITE, WAITING;
-
- public static FileNodeProcessorStatus deserialize(short i) {
- switch (i) {
- case 0:
- return NONE;
- case 1:
- return MERGING_WRITE;
- case 2:
- return WAITING;
- default:
- throw new IllegalArgumentException(
- String.format("Invalid input %d for FileNodeProcessorStatus", i));
- }
- }
-
- public short serialize() {
- switch (this) {
- case NONE:
- return 0;
- case MERGING_WRITE:
- return 1;
- case WAITING:
- return 2;
- default:
- throw new IllegalStateException("Unsupported type");
- }
-
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java
deleted file mode 100644
index 7c1eca4..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenode;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-/**
- * FileNodeProcessorStore is used to store information about FileNodeProcessor's status.
- * lastUpdateTime is changed and stored by BufferWrite flush or BufferWrite close.
- * emptyTsFileResource and newFileNodes are changed and stored by Overflow flush and
- * Overflow close. fileNodeProcessorState is changed and stored by the change of FileNodeProcessor's
- * status such as "work->merge merge->wait wait->work". numOfMergeFile is changed
- * and stored when FileNodeProcessor's status changes from work to merge.
- */
-public class FileNodeProcessorStore implements Serializable {
-
- private static final long serialVersionUID = -54525372941897565L;
-
- private boolean isOverflowed;
- private Map<String, Long> lastUpdateTimeMap;
- private TsFileResource emptyTsFileResource;
- private List<TsFileResource> newFileNodes;
- private int numOfMergeFile;
- private FileNodeProcessorStatus fileNodeProcessorStatus;
-
- /**
- * Constructor of FileNodeProcessorStore.
- *
- * @param isOverflowed whether this FileNode contains unmerged Overflow operations.
- * @param lastUpdateTimeMap the timestamp of last data point of each device in this FileNode.
- * @param emptyTsFileResource a place holder when the FileNode contains no TsFile.
- * @param newFileNodes TsFiles in the FileNode.
- * @param fileNodeProcessorStatus the status of the FileNode.
- * @param numOfMergeFile the number of files already merged in one merge operation.
- */
- public FileNodeProcessorStore(boolean isOverflowed, Map<String, Long> lastUpdateTimeMap,
- TsFileResource emptyTsFileResource,
- List<TsFileResource> newFileNodes,
- FileNodeProcessorStatus fileNodeProcessorStatus,
- int numOfMergeFile) {
- this.isOverflowed = isOverflowed;
- this.lastUpdateTimeMap = lastUpdateTimeMap;
- this.emptyTsFileResource = emptyTsFileResource;
- this.newFileNodes = newFileNodes;
- this.fileNodeProcessorStatus = fileNodeProcessorStatus;
- this.numOfMergeFile = numOfMergeFile;
- }
-
- public void serialize(OutputStream outputStream) throws IOException {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- ReadWriteIOUtils.write(this.isOverflowed, byteArrayOutputStream);
- // lastUpdateTimeMap
- ReadWriteIOUtils.write(lastUpdateTimeMap.size(), byteArrayOutputStream);
- for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
- ReadWriteIOUtils.write(entry.getKey(), byteArrayOutputStream);
- ReadWriteIOUtils.write(entry.getValue(), byteArrayOutputStream);
- }
- this.emptyTsFileResource.serialize(byteArrayOutputStream);
- ReadWriteIOUtils.write(this.newFileNodes.size(), byteArrayOutputStream);
- for (TsFileResource tsFileResource : this.newFileNodes) {
- tsFileResource.serialize(byteArrayOutputStream);
- }
- ReadWriteIOUtils.write(this.numOfMergeFile, byteArrayOutputStream);
- ReadWriteIOUtils.write(this.fileNodeProcessorStatus.serialize(), byteArrayOutputStream);
- // buffer array to outputstream
- byteArrayOutputStream.writeTo(outputStream);
- }
-
- public static FileNodeProcessorStore deSerialize(InputStream inputStream) throws IOException {
- boolean isOverflowed = ReadWriteIOUtils.readBool(inputStream);
- Map<String, Long> lastUpdateTimeMap = new HashMap<>();
- int size = ReadWriteIOUtils.readInt(inputStream);
- for (int i = 0; i < size; i++) {
- String path = ReadWriteIOUtils.readString(inputStream);
- long time = ReadWriteIOUtils.readLong(inputStream);
- lastUpdateTimeMap.put(path, time);
- }
- TsFileResource emptyTsFileResource = TsFileResource.deSerialize(inputStream);
- size = ReadWriteIOUtils.readInt(inputStream);
- List<TsFileResource> newFileNodes = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- newFileNodes.add(TsFileResource.deSerialize(inputStream));
- }
- int numOfMergeFile = ReadWriteIOUtils.readInt(inputStream);
- FileNodeProcessorStatus fileNodeProcessorStatus = FileNodeProcessorStatus
- .deserialize(ReadWriteIOUtils.readShort(inputStream));
-
- return new FileNodeProcessorStore(isOverflowed, lastUpdateTimeMap, emptyTsFileResource,
- newFileNodes, fileNodeProcessorStatus, numOfMergeFile);
- }
-
- public boolean isOverflowed() {
- return isOverflowed;
- }
-
- public void setOverflowed(boolean isOverflowed) {
- this.isOverflowed = isOverflowed;
- }
-
- public FileNodeProcessorStatus getFileNodeProcessorStatus() {
- return fileNodeProcessorStatus;
- }
-
- public void setFileNodeProcessorStatus(FileNodeProcessorStatus fileNodeProcessorStatus) {
- this.fileNodeProcessorStatus = fileNodeProcessorStatus;
- }
-
- public Map<String, Long> getLastUpdateTimeMap() {
- return new HashMap<>(lastUpdateTimeMap);
- }
-
- public void setLastUpdateTimeMap(Map<String, Long> lastUpdateTimeMap) {
- this.lastUpdateTimeMap = lastUpdateTimeMap;
- }
-
- public TsFileResource getEmptyTsFileResource() {
- return emptyTsFileResource;
- }
-
- public void setEmptyTsFileResource(TsFileResource emptyTsFileResource) {
- this.emptyTsFileResource = emptyTsFileResource;
- }
-
- public List<TsFileResource> getNewFileNodes() {
- return newFileNodes;
- }
-
- public void setNewFileNodes(List<TsFileResource> newFileNodes) {
- this.newFileNodes = newFileNodes;
- }
-
- public int getNumOfMergeFile() {
- return numOfMergeFile;
- }
-
- public void setNumOfMergeFile(int numOfMergeFile) {
- this.numOfMergeFile = numOfMergeFile;
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/OverflowChangeType.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/OverflowChangeType.java
deleted file mode 100644
index 48c8eee..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/OverflowChangeType.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenode;
-
-/**
- * if a file is not changed by overflow, it's in NO_CHANGE;<br>
- * if it's changed and in NO_CHANGE previously, NO_CHANGE-->CHANGED, update file<br>
- * If it's changed and in CHANGED previously, and in merging, CHANGED-->MERGING_CHANGE, update file<br>
- * If it's changed and in CHANGED previously, and not in merging, do nothing<br>
- * After merging, if it's MERGING_CHANGE, MERGING_CHANGE-->CHANGED, otherwise in NO_CHANGE, MERGING_CHANGE-->NO_CHANGE
- */
-public enum OverflowChangeType {
- NO_CHANGE, CHANGED, MERGING_CHANGE;
-
- public short serialize() {
- switch (this) {
- case NO_CHANGE:
- return 0;
- case CHANGED:
- return 1;
- case MERGING_CHANGE:
- return 2;
- default:
- throw new IllegalStateException("Unsupported type");
- }
- }
-
- public static OverflowChangeType deserialize(short i) {
- switch (i) {
- case 0:
- return NO_CHANGE;
- case 1:
- return CHANGED;
- case 2:
- return MERGING_CHANGE;
- default:
- throw new IllegalArgumentException(
- String.format("Invalid input %d for OverflowChangeType", i));
- }
- }
-}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java
index a3604a3..606a7bd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.memcontrol;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
import org.apache.iotdb.db.utils.MemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +60,7 @@ public class FlushPartialPolicy implements Policy {
private Thread createWorkerThread() {
return new Thread(() -> {
- FileNodeManager.getInstance().forceFlush(BasicMemController.UsageLevel.SAFE);
+ DatabaseEngineFactory.getCurrent().forceFlush(BasicMemController.UsageLevel.SAFE);
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException e) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/ForceFLushAllPolicy.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/ForceFLushAllPolicy.java
index 53533d9..3b50810 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/ForceFLushAllPolicy.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/ForceFLushAllPolicy.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.engine.memcontrol;
import org.apache.iotdb.db.concurrent.ThreadName;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
import org.apache.iotdb.db.utils.MemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +55,7 @@ public class ForceFLushAllPolicy implements Policy {
private Thread createWorkerThread() {
return new Thread(() ->
- FileNodeManager.getInstance().forceFlush(BasicMemController.UsageLevel.DANGEROUS),
+ DatabaseEngineFactory.getCurrent().forceFlush(BasicMemController.UsageLevel.DANGEROUS),
ThreadName.FORCE_FLUSH_ALL_POLICY.getName());
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 466138f..288382f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.Processor;
import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.EngingeConstants;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
@@ -113,9 +113,9 @@ public class OverflowProcessor extends Processor {
}
this.parentPath = overflowDirPath + processorName;
- overflowFlushAction = parameters.get(FileNodeConstants.OVERFLOW_FLUSH_ACTION);
+ overflowFlushAction = parameters.get(EngingeConstants.OVERFLOW_FLUSH_ACTION);
filenodeFlushAction = parameters
- .get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
+ .get(EngingeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
reopen();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/SeriesDataSource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/SeriesDataSource.java
index fd7280f..9c797ef 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/SeriesDataSource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/SeriesDataSource.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.engine.querycontext;
import java.util.List;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.sgmanager.TsFileResource;
import org.apache.iotdb.tsfile.read.common.Path;
public class SeriesDataSource {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java
index b1488cc..45d6faa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.DatabaseEngine;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -142,10 +141,10 @@ public class StorageGroupManager implements IStatistic, IService, DatabaseEngine
}
@Override
- public Map<String, TSRecord> getAllStatisticsValue() {
+ public Map<String, InsertPlan> getAllStatisticsValue() {
long curTime = System.currentTimeMillis();
TSRecord tsRecord = StatMonitor
- .convertToTSRecord(getStatParamsHashMap(), MonitorConstants.STAT_STORAGE_DELTA_NAME,
+ .convertToInsertPlan(getStatParamsHashMap(), MonitorConstants.STAT_STORAGE_DELTA_NAME,
curTime);
HashMap<String, TSRecord> ret = new HashMap<>();
ret.put(MonitorConstants.STAT_STORAGE_DELTA_NAME, tsRecord);
@@ -603,7 +602,7 @@ public class StorageGroupManager implements IStatistic, IService, DatabaseEngine
}
@Override
- public void deleteOneStorageGroup(String processorName) throws StorageGroupManagerException {
+ public void deleteStorageGroup(String processorName) throws StorageGroupManagerException {
if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) {
return;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
index 238596c..42923ad 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
@@ -48,8 +48,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.Processor;
-import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.EngingeConstants;
import org.apache.iotdb.db.engine.merge.MergeTask;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -268,7 +267,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
@Override
- public Map<String, TSRecord> getAllStatisticsValue() {
+ public Map<String, InsertPlan> getAllStatisticsValue() {
Long curTime = System.currentTimeMillis();
HashMap<String, TSRecord> tsRecordHashMap = new HashMap<>();
TSRecord tsRecord = new TSRecord(curTime, statStorageGroupName);
@@ -781,7 +780,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
mergeBaseDir = Directories.getInstance().getNextFolderForTsfile();
mergeFileName = minimumTime
- + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis()
+ + EngingeConstants.TSFILE_NAME_SEPARATOR + System.currentTimeMillis()
+ MERGE_TEMP_SUFFIX;
mergeOutputPath = constructOutputFilePath(mergeBaseDir, getProcessorName(),
mergeFileName);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/TsFileResource.java
similarity index 97%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/TsFileResource.java
index d8ba95f..4927ba9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/TsFileResource.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.engine.filenode;
+package org.apache.iotdb.db.engine.sgmanager;
import java.io.File;
import java.io.IOException;
@@ -28,11 +28,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.locks.ReadWriteLock;
import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.bufferwrite.RestorableTsFileIOWriter;
+import org.apache.iotdb.db.engine.tsfiledata.RestorableTsFileIOWriter;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
@@ -53,7 +51,7 @@ public class TsFileResource {
private Map<String, Long> endTimeMap;
private Set<String> mergeChanged = new HashSet<>();
- private transient ModificationFile modFile;
+ private ModificationFile modFile;
/**
* @param autoRead whether read the file to initialize startTimeMap and endTimeMap
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/RestorableTsFileIOWriter.java
similarity index 99%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/RestorableTsFileIOWriter.java
index a7542b5..7fa9190 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/RestorableTsFileIOWriter.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.engine.bufferwrite;
+package org.apache.iotdb.db.engine.tsfiledata;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
index c729285..d787609 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
@@ -42,9 +42,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.Processor;
-import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
-import org.apache.iotdb.db.engine.bufferwrite.RestorableTsFileIOWriter;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.EngingeConstants;
+import org.apache.iotdb.db.engine.sgmanager.TsFileResource;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -60,7 +59,6 @@ import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
import org.apache.iotdb.db.engine.sgmanager.OperationResult;
import org.apache.iotdb.db.engine.sgmanager.StorageGroupProcessor;
import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.FileNodeProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -236,12 +234,12 @@ public class TsFileProcessor extends Processor {
throws TsFileProcessorException {
File[] tsFiles = dataFolder
.listFiles(x -> !x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX)
- && x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR).length == 2);
+ && x.getName().split(EngingeConstants.TSFILE_NAME_SEPARATOR).length == 2);
if (tsFiles == null || tsFiles.length == 0) {
return;
}
Arrays.sort(tsFiles, Comparator.comparingLong(x -> Long
- .parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
+ .parseLong(x.getName().split(EngingeConstants.TSFILE_NAME_SEPARATOR)[0])));
for (File tsfile : tsFiles) {
if (tsfile.getName().endsWith(StorageGroupProcessor.MERGE_TEMP_SUFFIX)) {
@@ -259,7 +257,7 @@ public class TsFileProcessor extends Processor {
// add one TsFiles to tsFileResources and update device inserted time map
private void addResource(File tsfile) throws TsFileProcessorException {
//TODO we'd better define a file suffix for TsFile, e.g., .ts
- String[] names = tsfile.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR);
+ String[] names = tsfile.getName().split(EngingeConstants.TSFILE_NAME_SEPARATOR);
long time = Long.parseLong(names[0]);
if (fileNamePrefix < time) {
fileNamePrefix = time;
@@ -299,7 +297,7 @@ public class TsFileProcessor extends Processor {
LOGGER.debug("The bufferwrite processor data dir doesn't exists, create new directory {}.",
dataFolder.getAbsolutePath());
}
- String fileName = (fileNamePrefix + 1) + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
+ String fileName = (fileNamePrefix + 1) + EngingeConstants.TSFILE_NAME_SEPARATOR
+ System.currentTimeMillis();
return new File(dataFolder, fileName);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/IStatistic.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/IStatistic.java
index 36c83da..a25d595 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/IStatistic.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/IStatistic.java
@@ -23,7 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
public interface IStatistic {
@@ -32,7 +32,7 @@ public interface IStatistic {
* @return A HashMap that contains the module seriesPath like: root.stats.write.global,
* and its value is TSRecord format contains all statistics measurement
*/
- Map<String, TSRecord> getAllStatisticsValue();
+ Map<String, InsertPlan> getAllStatisticsValue();
/**
* registerStatMetadata registers statistics info to the manager.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index b04e506..2622bcd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -31,7 +32,8 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.DatabaseEngine;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.StartupException;
@@ -39,6 +41,7 @@ import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.MonitorConstants.StorageGroupManagerStatConstants;
import org.apache.iotdb.db.monitor.MonitorConstants.StorageGroupProcessorStatConstants;
import org.apache.iotdb.db.monitor.collector.FileSize;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -107,22 +110,25 @@ public class StatMonitor implements IService {
}
/**
- * generate TSRecord.
+ * generate InsertPlan.
*
* @param hashMap key is statParams name, values is AtomicLong type
* @param statGroupDeltaName is the deviceId seriesPath of this module
* @param curTime current time stamp
- * @return TSRecord contains the DataPoints of a statGroupDeltaName
+ * @return InsertPlan contains the DataPoints of a statGroupDeltaName
*/
- public static TSRecord convertToTSRecord(Map<String, AtomicLong> hashMap,
+ public static InsertPlan convertToInsertPlan(Map<String, AtomicLong> hashMap,
String statGroupDeltaName, long curTime) {
- TSRecord tsRecord = new TSRecord(curTime, statGroupDeltaName);
- tsRecord.dataPointList = new ArrayList<>();
+
+ List<String> measurements = new ArrayList<>();
+ List<String> values = new ArrayList<>();
for (Map.Entry<String, AtomicLong> entry : hashMap.entrySet()) {
AtomicLong value = entry.getValue();
- tsRecord.dataPointList.add(new LongDataPoint(entry.getKey(), value.get()));
+ measurements.add(entry.getKey());
+ values.add(String.valueOf(value.get()));
}
- return tsRecord;
+ return new InsertPlan(statGroupDeltaName, curTime,
+ measurements.toArray(new String[0]), values.toArray(new String[0]));
}
public long getNumPointsInsert() {
@@ -218,53 +224,24 @@ public class StatMonitor implements IService {
}
/**
- * This function is not used and need to complete the query key concept.
- *
- * @return TSRecord, query statistics params
- */
- public Map<String, TSRecord> getOneStatisticsValue(String key) {
- // queryPath like fileNode seriesPath: root.stats.car1,
- // or FileNodeManager seriesPath:FileNodeManager
- String queryPath;
- if (key.contains("\\.")) {
- queryPath =
- MonitorConstants.STAT_STORAGE_GROUP_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR
- + key.replaceAll("\\.", "_");
- } else {
- queryPath = key;
- }
- if (statisticMap.containsKey(queryPath)) {
- return statisticMap.get(queryPath).getAllStatisticsValue();
- } else {
- long currentTimeMillis = System.currentTimeMillis();
- HashMap<String, TSRecord> hashMap = new HashMap<>();
- TSRecord tsRecord = convertToTSRecord(
- MonitorConstants.initValues(MonitorConstants.FILENODE_PROCESSOR_CONST), queryPath,
- currentTimeMillis);
- hashMap.put(queryPath, tsRecord);
- return hashMap;
- }
- }
-
- /**
* get all statistics.
*/
- public Map<String, TSRecord> gatherStatistics() {
+ public Map<String, InsertPlan> gatherStatistics() {
synchronized (statisticMap) {
long currentTimeMillis = System.currentTimeMillis();
- HashMap<String, TSRecord> tsRecordHashMap = new HashMap<>();
+ HashMap<String, InsertPlan> tsRecordHashMap = new HashMap<>();
for (Map.Entry<String, IStatistic> entry : statisticMap.entrySet()) {
if (entry.getValue() == null) {
switch (entry.getKey()) {
case MonitorConstants.STAT_STORAGE_DELTA_NAME:
tsRecordHashMap.put(entry.getKey(),
- convertToTSRecord(
+ convertToInsertPlan(
MonitorConstants.initValues(MonitorConstants.FILENODE_PROCESSOR_CONST),
entry.getKey(), currentTimeMillis));
break;
case MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME:
tsRecordHashMap.put(entry.getKey(),
- convertToTSRecord(
+ convertToInsertPlan(
MonitorConstants.initValues(MonitorConstants.FILE_SIZE_CONST),
entry.getKey(), currentTimeMillis));
break;
@@ -274,8 +251,8 @@ public class StatMonitor implements IService {
tsRecordHashMap.putAll(entry.getValue().getAllStatisticsValue());
}
}
- for (TSRecord value : tsRecordHashMap.values()) {
- value.time = currentTimeMillis;
+ for (InsertPlan value : tsRecordHashMap.values()) {
+ value.setTime(currentTimeMillis);
}
return tsRecordHashMap;
}
@@ -348,12 +325,12 @@ public class StatMonitor implements IService {
if (seconds >= statMonitorDetectFreqSec) {
runningTimeMillis = currentTimeMillis;
// delete time-series data
- FileNodeManager fManager = FileNodeManager.getInstance();
+ DatabaseEngine dbEngine = DatabaseEngineFactory.getCurrent();
try {
for (Map.Entry<String, IStatistic> entry : statisticMap.entrySet()) {
for (String statParamName : entry.getValue().getStatParamsHashMap().keySet()) {
if (temporaryStatList.contains(statParamName)) {
- fManager.delete(entry.getKey(), statParamName,
+ dbEngine.deleteData(entry.getKey(), statParamName,
currentTimeMillis - statMonitorRetainIntervalSec * 1000);
}
}
@@ -364,7 +341,7 @@ public class StatMonitor implements IService {
e);
}
}
- Map<String, TSRecord> tsRecordHashMap = gatherStatistics();
+ Map<String, InsertPlan> tsRecordHashMap = gatherStatistics();
insert(tsRecordHashMap);
numBackLoop.incrementAndGet();
} catch (Exception e) {
@@ -372,14 +349,14 @@ public class StatMonitor implements IService {
}
}
- public void insert(Map<String, TSRecord> tsRecordHashMap) {
- FileNodeManager fManager = FileNodeManager.getInstance();
+ public void insert(Map<String, InsertPlan> tsRecordHashMap) {
+ DatabaseEngine dbEngine = DatabaseEngineFactory.getCurrent();
int pointNum;
- for (Map.Entry<String, TSRecord> entry : tsRecordHashMap.entrySet()) {
+ for (Map.Entry<String, InsertPlan> entry : tsRecordHashMap.entrySet()) {
try {
- fManager.insert(entry.getValue(), true);
+ dbEngine.insert(entry.getValue(), true);
numInsert.incrementAndGet();
- pointNum = entry.getValue().dataPointList.size();
+ pointNum = entry.getValue().getValues().length;
numPointsInsert.addAndGet(pointNum);
} catch (StorageGroupManagerException e) {
numInsertError.incrementAndGet();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java
index b3e695f..6f780cf 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java
@@ -30,12 +30,14 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.DatabaseEngine;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.monitor.IStatistic;
import org.apache.iotdb.db.monitor.MonitorConstants;
import org.apache.iotdb.db.monitor.MonitorConstants.FileSizeConstants;
import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -54,13 +56,13 @@ public class FileSize implements IStatistic {
private static final Logger LOGGER = LoggerFactory.getLogger(FileSize.class);
private static final long ABNORMAL_VALUE = -1L;
private static final long INIT_VALUE_IF_FILE_NOT_EXIST = 0L;
- private FileNodeManager fileNodeManager;
+ private DatabaseEngine databaseEngine;
@Override
- public Map<String, TSRecord> getAllStatisticsValue() {
+ public Map<String, InsertPlan> getAllStatisticsValue() {
long curTime = System.currentTimeMillis();
TSRecord tsRecord = StatMonitor
- .convertToTSRecord(getStatParamsHashMap(), MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME,
+ .convertToInsertPlan(getStatParamsHashMap(), MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME,
curTime);
HashMap<String, TSRecord> ret = new HashMap<>();
ret.put(MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME, tsRecord);
@@ -77,11 +79,11 @@ public class FileSize implements IStatistic {
hashMap.put(seriesPath, MonitorConstants.DATA_TYPE_INT64);
Path path = new Path(seriesPath);
try {
- fileNodeManager.addTimeSeries(path, TSDataType.valueOf(MonitorConstants.DATA_TYPE_INT64),
+ databaseEngine.addTimeSeries(path, TSDataType.valueOf(MonitorConstants.DATA_TYPE_INT64),
TSEncoding.valueOf("RLE"), CompressionType.valueOf(TSFileConfig.compressor),
Collections.emptyMap());
} catch (StorageGroupManagerException e) {
- LOGGER.error("Register File Size Stats into fileNodeManager Failed.", e);
+ LOGGER.error("Register File Size Stats into databaseEngine Failed.", e);
}
}
StatMonitor.getInstance().registerStatStorageGroup(hashMap);
@@ -114,7 +116,7 @@ public class FileSize implements IStatistic {
}
private FileSize() {
- fileNodeManager = FileNodeManager.getInstance();
+ databaseEngine = DatabaseEngineFactory.getCurrent();
if (config.isEnableStatMonitor()) {
StatMonitor statMonitor = StatMonitor.getInstance();
registerStatMetadata();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 2fc51bb..b30da6c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -31,7 +31,8 @@ import org.apache.iotdb.db.auth.entity.PathPrivilege;
import org.apache.iotdb.db.auth.entity.PrivilegeType;
import org.apache.iotdb.db.auth.entity.Role;
import org.apache.iotdb.db.auth.entity.User;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.DatabaseEngine;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
import org.apache.iotdb.db.exception.ArgsErrorException;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -63,8 +64,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,11 +72,11 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
private static final Logger LOG = LoggerFactory.getLogger(OverflowQPExecutor.class);
- private FileNodeManager fileNodeManager;
+ private DatabaseEngine databaseEngine;
private MManager mManager = MManager.getInstance();
public OverflowQPExecutor() {
- fileNodeManager = FileNodeManager.getInstance();
+ databaseEngine = DatabaseEngineFactory.getCurrent();
}
public static String checkValue(TSDataType dataType, String value) throws ProcessorException {
@@ -117,9 +116,8 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
return flag;
case INSERT:
InsertPlan insert = (InsertPlan) plan;
- int result = multiInsert(insert.getDeviceId(), insert.getTime(), insert.getMeasurements(),
- insert.getValues());
- return result > 0;
+ multiInsert(insert);
+ return true;
case CREATE_ROLE:
case DELETE_ROLE:
case CREATE_USER:
@@ -215,7 +213,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
mManager.getStorageGroupByPath(fullPath);
TSDataType dataType = mManager.getSeriesType(fullPath);
value = checkValue(dataType, value);
- fileNodeManager.update(deviceId, measurementId, startTime, endTime, dataType, value);
+ databaseEngine.update(deviceId, measurementId, startTime, endTime, dataType, value);
return true;
} catch (PathErrorException e) {
throw new ProcessorException(e.getMessage());
@@ -235,7 +233,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
String.format("Timeseries %s does not exist.", path.getFullPath()));
}
mManager.getStorageGroupByPath(path.getFullPath());
- fileNodeManager.delete(deviceId, measurementId, timestamp);
+ databaseEngine.deleteData(deviceId, measurementId, timestamp);
return true;
} catch (PathErrorException | StorageGroupManagerException e) {
throw new ProcessorException(e);
@@ -243,30 +241,12 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
}
@Override
- public void insert(Path path, long timestamp, String value) throws ProcessorException {
- String deviceId = path.getDevice();
- String measurementId = path.getMeasurement();
-
- try {
- TSDataType type = mManager.getSeriesType(deviceId + "," + measurementId);
- TSRecord tsRecord = new TSRecord(timestamp, deviceId);
- DataPoint dataPoint = DataPoint.getDataPoint(type, measurementId, value);
- tsRecord.addTuple(dataPoint);
- fileNodeManager.insert(tsRecord, false);
-
- } catch (PathErrorException e) {
- throw new ProcessorException("Error in insert: " + e.getMessage());
- } catch (StorageGroupManagerException e) {
- throw new ProcessorException(e);
- }
- }
-
- @Override
- public int multiInsert(String deviceId, long insertTime, String[] measurementList,
- String[] insertValues)
+ public void multiInsert(InsertPlan plan)
throws ProcessorException {
try {
- TSRecord tsRecord = new TSRecord(insertTime, deviceId);
+ String deviceId = plan.getDeviceId();
+ String[] measurementList = plan.getMeasurements();
+ String[] insertValues = plan.getValues();
MNode node = mManager.getNodeByDeviceIdFromCache(deviceId);
@@ -284,12 +264,9 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
}
TSDataType dataType = measurementNode.getSchema().getType();
- String value = insertValues[i];
- value = checkValue(dataType, value);
- DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementList[i], value);
- tsRecord.addTuple(dataPoint);
+ insertValues[i] = checkValue(dataType, insertValues[i]);
}
- return fileNodeManager.insert(tsRecord, false);
+ databaseEngine.insert(plan, false);
} catch (PathErrorException | StorageGroupManagerException e) {
throw new ProcessorException(e.getMessage());
@@ -535,11 +512,11 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
try {
if (isNewMeasurement) {
// add time series to schema
- fileNodeManager.addTimeSeries(path, dataType, encoding, compressor, props);
- //TODO fileNodeManager.addTimeSeries(
+ databaseEngine.addTimeSeries(path, dataType, encoding, compressor, props);
+ //TODO databaseEngine.addTimeSeries(
//TODO path, resultDataType, encoding, compressor, encodingArgs);
}
- // fileNodeManager.closeOneFileNode(namespacePath);
+ // databaseEngine.closeOneFileNode(namespacePath);
} catch (StorageGroupManagerException e) {
throw new ProcessorException(e);
}
@@ -613,10 +590,10 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
closeFileNodes.removeAll(deleteFielNodes);
for (String deleteFileNode : deleteFielNodes) {
// close processor
- fileNodeManager.deleteOneFileNode(deleteFileNode);
+ databaseEngine.deleteStorageGroup(deleteFileNode);
}
for (String closeFileNode : closeFileNodes) {
- fileNodeManager.closeOneFileNode(closeFileNode);
+ databaseEngine.closeStorageGroup(closeFileNode);
}
}
break;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index c89894c..d139f65 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.EngineQueryRouter;
@@ -172,25 +173,11 @@ public abstract class QueryProcessExecutor {
protected abstract boolean delete(Path path, long deleteTime) throws ProcessorException;
/**
- * insert a single value. Only used in test
+ * execute insert command and return whether the operator is successful.
*
- * @param path seriesPath to be inserted
- * @param insertTime - it's time point but not a range
- * @param value value to be inserted
+ * @param plan the InsertPlan
*/
- public abstract void insert(Path path, long insertTime, String value) throws ProcessorException;
-
- /**
- * executeWithGlobalTimeFilter insert command and return whether the operator is successful.
- *
- * @param deviceId deviceId to be inserted
- * @param insertTime - it's time point but not a range
- * @param measurementList measurements to be inserted
- * @param insertValues values to be inserted
- * @return - Operate Type.
- */
- public abstract int multiInsert(String deviceId, long insertTime, String[] measurementList,
- String[] insertValues) throws ProcessorException;
+ public abstract void multiInsert(InsertPlan plan) throws ProcessorException;
public abstract List<String> getAllPaths(String originPath) throws PathErrorException;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
index ad55b7a..84ceeb0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.query.control;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.sgmanager.TsFileResource;
import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index 2b75efb..310a850 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -27,7 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -45,9 +45,9 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
* QueryResourceManager manages resource (file streams) used by each query job, and assign Ids to the jobs.
* During the life cycle of a query, the following methods must be called in strict order:
* 1. assignJobId - get an Id for the new job.
- * 2. beginQueryOfGivenQueryPaths - remind FileNodeManager that some files are being used
+ * 2. beginQueryOfGivenQueryPaths - remind DatabaseEngine that some files are being used
* 3. (if using filter)beginQueryOfGivenExpression
- * - remind FileNodeManager that some files are being used
+ * - remind DatabaseEngine that some files are being used
* 4. getQueryDataSource - open files for the job or reuse existing readers.
* 5. endQueryForGivenJob - release the resource used by this job.
* </p>
@@ -65,29 +65,29 @@ public class QueryResourceManager {
* <p>
* For example, during a query process Q1, given a query sql <sql>select device_1.sensor_1,
* device_1.sensor_2, device_2.sensor_1, device_2.sensor_2</sql>, we will invoke
- * <code>FileNodeManager.getInstance().beginQuery(device_1)</code> and
- * <code>FileNodeManager.getInstance().beginQuery(device_2)</code> both once. Although there
+ * <code>DatabaseEngineFactory.getCurrent().beginQuery(device_1)</code> and
+ * <code>DatabaseEngineFactory.getCurrent().beginQuery(device_2)</code> both once. Although there
* exists four paths, but the unique devices are only `device_1` and `device_2`. When invoking
- * <code>FileNodeManager.getInstance().beginQuery(device_1)</code>, it returns result token `1`.
+ * <code>DatabaseEngineFactory.getCurrent().beginQuery(device_1)</code>, it returns result token `1`.
* Similarly,
- * <code>FileNodeManager.getInstance().beginQuery(device_2)</code> returns result token `2`.
+ * <code>DatabaseEngineFactory.getCurrent().beginQuery(device_2)</code> returns result token `2`.
*
* In the meanwhile, another query process Q2 aroused by other client is triggered, whose sql
- * statement is same to Q1. Although <code>FileNodeManager.getInstance().beginQuery(device_1)
+ * statement is same to Q1. Although <code>DatabaseEngineFactory.getCurrent().beginQuery(device_1)
* </code>
* and
- * <code>FileNodeManager.getInstance().beginQuery(device_2)</code> will be invoked again, it
+ * <code>DatabaseEngineFactory.getCurrent().beginQuery(device_2)</code> will be invoked again, it
* returns result token `3` and `4` .
*
- * <code>FileNodeManager.getInstance().endQueryForGivenJob(device_1, 1)</code> and
- * <code>FileNodeManager.getInstance().endQueryForGivenJob(device_2, 2)</code> must be invoked no matter how
+ * <code>DatabaseEngineFactory.getCurrent().endQueryForGivenJob(device_1, 1)</code> and
+ * <code>DatabaseEngineFactory.getCurrent().endQueryForGivenJob(device_2, 2)</code> must be invoked no matter how
* query process Q1 exits normally or abnormally. So is Q2,
- * <code>FileNodeManager.getInstance().endQueryForGivenJob(device_1, 3)</code> and
- * <code>FileNodeManager.getInstance().endQueryForGivenJob(device_2, 4)</code> must be invoked
+ * <code>DatabaseEngineFactory.getCurrent().endQueryForGivenJob(device_1, 3)</code> and
+ * <code>DatabaseEngineFactory.getCurrent().endQueryForGivenJob(device_2, 4)</code> must be invoked
*
* Last but no least, to ensure the correctness of write process and query process of IoTDB,
- * <code>FileNodeManager.getInstance().beginQuery()</code> and
- * <code>FileNodeManager.getInstance().endQueryForGivenJob()</code> must be executed rightly.
+ * <code>DatabaseEngineFactory.getCurrent().beginQuery()</code> and
+ * <code>DatabaseEngineFactory.getCurrent().endQueryForGivenJob()</code> must be executed rightly.
* </p>
*/
private ConcurrentHashMap<Long, ConcurrentHashMap<String, List<Integer>>> queryTokensMap;
@@ -125,7 +125,7 @@ public class QueryResourceManager {
for (String deviceId : deviceIdSet) {
putQueryTokenForCurrentRequestThread(jobId, deviceId,
- FileNodeManager.getInstance().beginQuery(deviceId));
+ DatabaseEngineFactory.getCurrent().beginQuery(deviceId));
}
}
@@ -139,7 +139,7 @@ public class QueryResourceManager {
getUniquePaths(expression, deviceIdSet);
for (String deviceId : deviceIdSet) {
putQueryTokenForCurrentRequestThread(jobId, deviceId,
- FileNodeManager.getInstance().beginQuery(deviceId));
+ DatabaseEngineFactory.getCurrent().beginQuery(deviceId));
}
}
@@ -148,7 +148,7 @@ public class QueryResourceManager {
throws StorageGroupManagerException {
SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(selectedPath, null);
- QueryDataSource queryDataSource = FileNodeManager.getInstance()
+ QueryDataSource queryDataSource = DatabaseEngineFactory.getCurrent()
.query(singleSeriesExpression, context);
// add used files to current thread request cached map
@@ -168,7 +168,7 @@ public class QueryResourceManager {
}
for (Map.Entry<String, List<Integer>> entry : queryTokensMap.get(jobId).entrySet()) {
for (int token : entry.getValue()) {
- FileNodeManager.getInstance().endQuery(entry.getKey(), token);
+ DatabaseEngineFactory.getCurrent().endQuery(entry.getKey(), token);
}
}
queryTokensMap.remove(jobId);
@@ -198,26 +198,4 @@ public class QueryResourceManager {
}
}
-
-
- /**
- * TODO
- * This is only for test TsFileProcessor now. This method will finally be replaced when
- * fileNodeManager is refactored
- */
- public QueryDataSource getQueryDataSourceByTsFileProcessor(Path selectedPath,
- QueryContext context, TsFileProcessor processor)
- throws IOException, StorageGroupManagerException {
- OverflowSeriesDataSource overflowSeriesDataSource = new OverflowSeriesDataSource(selectedPath);
- overflowSeriesDataSource.setOverflowInsertFileList(Collections.EMPTY_LIST);
-
- SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(selectedPath, null);
- SeriesDataSource dataSource =processor.query(singleSeriesExpression, context);
- QueryDataSource queryDataSource = new QueryDataSource(dataSource, overflowSeriesDataSource);
- // add used files to current thread request cached map
- filePathsManager.addUsedFilesForGivenJob(context.getJobId(), queryDataSource);
-
- return queryDataSource;
-
- }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
index d479be9..2819915 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
@@ -92,7 +92,8 @@ public class EngineExecutorWithoutTimeGenerator {
PriorityMergeReader unSeqMergeReader;
try {
unSeqMergeReader = SeriesReaderFactory.getInstance()
- .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
+ .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(),
+ context, timeFilter);
} catch (IOException e) {
throw new StorageGroupManagerException(e);
}
@@ -146,62 +147,8 @@ public class EngineExecutorWithoutTimeGenerator {
PriorityMergeReader unSeqMergeReader;
try {
unSeqMergeReader = SeriesReaderFactory.getInstance()
- .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
- } catch (IOException e) {
- throw new StorageGroupManagerException(e);
- }
-
- // merge sequence data with unsequence data.
- readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
- }
-
- try {
- return new EngineDataSetWithoutTimeGenerator(queryExpression.getSelectedSeries(), dataTypes,
- readersOfSelectedSeries);
- } catch (IOException e) {
- throw new StorageGroupManagerException(e);
- }
- }
- /**
- * TODO
- * This is only for test TsFileProcessor now. This method will finally be replaced when
- * fileNodeManager is refactored
- */
- public QueryDataSet executeWithoutFilter(QueryContext context, TsFileProcessor processor)
- throws StorageGroupManagerException, IOException {
-
- List<IPointReader> readersOfSelectedSeries = new ArrayList<>();
- List<TSDataType> dataTypes = new ArrayList<>();
-
- QueryResourceManager.getInstance()
- .beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries());
-
- for (Path path : queryExpression.getSelectedSeries()) {
-
- QueryDataSource queryDataSource = QueryResourceManager.getInstance()
- .getQueryDataSourceByTsFileProcessor(path, context, processor);
-
- // add data type
- try {
- dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
- } catch (PathErrorException e) {
- throw new StorageGroupManagerException(e);
- }
-
- // sequence insert data
- SequenceDataReader tsFilesReader;
- try {
- tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- null, context);
- } catch (IOException e) {
- throw new StorageGroupManagerException(e);
- }
-
- // unseq insert data
- PriorityMergeReader unSeqMergeReader;
- try {
- unSeqMergeReader = SeriesReaderFactory.getInstance()
- .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
+ .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), context,
+ null);
} catch (IOException e) {
throw new StorageGroupManagerException(e);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index 930e87f..c808b0b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -220,21 +220,4 @@ public class EngineQueryRouter {
return merged;
}
- /**
- * TODO
- * This is only for test TsFileProcessor now. This method will finally be replaced when
- * fileNodeManager is refactored
- */
- public QueryDataSet query(QueryExpression queryExpression, TsFileProcessor processor,
- QueryContext context)
- throws StorageGroupManagerException, IOException {
-
- if (queryExpression.hasQueryFilter()) {
- throw new NotImplementedException("this function is just for test...");
- } else {
- EngineExecutorWithoutTimeGenerator engineExecutor = new EngineExecutorWithoutTimeGenerator(
- queryExpression);
- return engineExecutor.executeWithoutFilter(context, processor);
- }
- }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 608c623..c6d34dc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -19,10 +19,11 @@
package org.apache.iotdb.db.query.factory;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.sgmanager.TsFileResource;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -51,8 +52,10 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerier;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithFilter;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
public class SeriesReaderFactory {
@@ -70,7 +73,7 @@ public class SeriesReaderFactory {
* process, it's no need to maintain the opened file stream.
*/
public PriorityMergeReader createUnSeqMergeReader(
- SeriesDataSource overflowSeriesDataSource, QueryContext context)
+ SeriesDataSource overflowSeriesDataSource, QueryContext context, Filter filter)
throws IOException {
PriorityMergeReader unSeqMergeReader = new PriorityMergeReader();
@@ -87,7 +90,12 @@ public class SeriesReaderFactory {
for (ChunkMetaData chunkMetaData : metaDataList) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
- ChunkReader chunkReader = new ChunkReaderWithoutFilter(chunk);
+ ChunkReader chunkReader;
+ if (filter == null) {
+ chunkReader = new ChunkReaderWithoutFilter(chunk);
+ } else {
+ chunkReader = new ChunkReaderWithFilter(chunk, filter);
+ }
unSeqMergeReader
.addReaderWithPriority(new EngineChunkReader(chunkReader, tsFileSequenceReader),
@@ -114,7 +122,7 @@ public class SeriesReaderFactory {
SequenceDataReader seqReader = new SequenceDataReader(seqDataSource, null, context);
// unSequence merge reader
- IPointReader unSeqReader = createUnSeqMergeReader(overflowDataSource, context);
+ IPointReader unSeqReader = createUnSeqMergeReader(overflowDataSource, context, null);
return new AllDataReader(seqReader, unSeqReader);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
index 0c44297..e38d673 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
@@ -23,7 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.sgmanager.TsFileResource;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.db.query.reader.IBatchReader;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReaderByTimestamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReaderByTimestamp.java
index e58583c..5778c50 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReaderByTimestamp.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReaderByTimestamp.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.query.reader.sequence;
import java.io.IOException;
import java.util.List;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.sgmanager.TsFileResource;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java b/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java
index 0915cd3..3743ad0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.exception.StartupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -178,7 +177,7 @@ public class CloseMergeService implements IService {
+ "time interval is {}s.", startDateTime, endDateTime, timeInterval / 1000);
mergeAllLastTime = System.currentTimeMillis();
try {
- FileNodeManager.getInstance().mergeAll();
+ DatabaseEngineFactory.getCurrent().mergeAll();
} catch (Exception e) {
LOGGER.error("Merge all error.", e);
}
@@ -204,7 +203,7 @@ public class CloseMergeService implements IService {
+ "time interval is {}s.", startDateTime, endDateTime, timeInterval / 1000);
closeAllLastTime = System.currentTimeMillis();
try {
- FileNodeManager.getInstance().closeAll();
+ DatabaseEngineFactory.getCurrent().closeAll();
} catch (Exception e) {
LOGGER.error("close all error.", e);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 86c667c..e6f768e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.db.concurrent.IoTDBDefaultThreadExceptionHandler;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -82,7 +81,7 @@ public class IoTDB implements IoTDBMBean {
Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook());
setUncaughtExceptionHandler();
- FileNodeManager.getInstance().recovery();
+ DatabaseEngineFactory.getCurrent().recovery();
try {
systemDataRecovery();
} catch (RecoverException e) {
@@ -98,7 +97,7 @@ public class IoTDB implements IoTDBMBean {
StatMonitor.getInstance().recovery();
}
- registerManager.register(FileNodeManager.getInstance());
+ registerManager.register(DatabaseEngineFactory.getCurrent());
registerManager.register(MultiFileLogNodeManager.getInstance());
registerManager.register(JMXService.getInstance());
registerManager.register(JDBCService.getInstance());
@@ -154,7 +153,7 @@ public class IoTDB implements IoTDBMBean {
for (String filenodeName : filenodeNames) {
if (writeLogManager.hasWAL(filenodeName)) {
try {
- FileNodeManager.getInstance().recoverFileNode(filenodeName);
+ DatabaseEngineFactory.getCurrent().recoverFileNode(filenodeName);
} catch (StorageGroupManagerException e) {
throw new RecoverException(e);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index ffaefdb..18e1008 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
import org.apache.iotdb.db.exception.ArgsErrorException;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -376,17 +376,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
switch (statement) {
case "flush":
try {
- FileNodeManager.getInstance().closeAll();
+ DatabaseEngineFactory.getCurrent().closeAll();
} catch (StorageGroupManagerException e) {
- LOGGER.error("meet error while FileNodeManager closing all!", e);
+ LOGGER.error("meet error while DataBaseEngine closing all!", e);
throw new IOException(e);
}
return true;
case "merge":
try {
- FileNodeManager.getInstance().mergeAll();
+ DatabaseEngineFactory.getCurrent().mergeAll();
} catch (StorageGroupManagerException e) {
- LOGGER.error("meet error while FileNodeManager merging all!", e);
+ LOGGER.error("meet error while DataBaseEngine merging all!", e);
throw new IOException(e);
}
return true;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/SyncUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/SyncUtils.java
index a946112..01d9448 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/SyncUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/SyncUtils.java
@@ -25,7 +25,7 @@ import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.sgmanager.TsFileResource;
import org.apache.iotdb.db.sync.conf.Constans;
public class SyncUtils {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
index d933dbb..a7292bf 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
@@ -43,9 +43,9 @@ import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
-import org.apache.iotdb.db.engine.filenode.OverflowChangeType;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.DatabaseEngine;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
+import org.apache.iotdb.db.engine.sgmanager.TsFileResource;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.MetadataArgsErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -81,7 +81,7 @@ public class SyncServiceImpl implements SyncService.Iface {
private static final Logger logger = LoggerFactory.getLogger(SyncServiceImpl.class);
- private static final FileNodeManager fileNodeManager = FileNodeManager.getInstance();
+ private static final DatabaseEngine databaseEngine = DatabaseEngineFactory.getCurrent();
/**
* Metadata manager
**/
@@ -474,12 +474,12 @@ public class SyncServiceImpl implements SyncService.Iface {
);
// call interface of load external file
try {
- if (!fileNodeManager.appendFileToFileNode(storageGroup, fileNode, path)) {
+ if (!databaseEngine.appendFileToStorageGroup(storageGroup, fileNode, path)) {
// it is a file with overflow data
if (config.isUpdate_historical_data_possibility()) {
loadOldData(path);
} else {
- List<String> overlapFiles = fileNodeManager.getOverlapFilesFromFileNode(
+ List<String> overlapFiles = databaseEngine.getOverlapFilesFromStorageGroup(
storageGroup,
fileNode, uuid.get());
if (overlapFiles.isEmpty()) {
@@ -555,11 +555,9 @@ public class SyncServiceImpl implements SyncService.Iface {
}
}
}
- if (insertExecutor
- .multiInsert(deviceId, record.getTimestamp(), measurementList.toArray(new String[]{}),
- insertValues.toArray(new String[]{})) <= 0) {
- throw new IOException("Inserting series data to IoTDB engine has failed.");
- }
+ insertExecutor
+ .multiInsert(new InsertPlan(deviceId, record.getTimestamp(),
+ measurementList.toArray(new String[]{}), insertValues.toArray(new String[]{})));
}
}
} catch (IOException e) {
@@ -633,19 +631,13 @@ public class SyncServiceImpl implements SyncService.Iface {
/** If there has no overlap data with the timeseries, inserting all data in the sync file **/
if (originDataPoints.isEmpty()) {
for (InsertPlan insertPlan : newDataPoints) {
- if (insertExecutor.multiInsert(insertPlan.getDeviceId(), insertPlan.getTime(),
- insertPlan.getMeasurements(), insertPlan.getValues()) <= 0) {
- throw new IOException("Inserting series data to IoTDB engine has failed.");
- }
+ insertExecutor.multiInsert(insertPlan);
}
} else {
/** Compare every data to get valid data **/
for (InsertPlan insertPlan : newDataPoints) {
if (!originDataPoints.contains(insertPlan)) {
- if (insertExecutor.multiInsert(insertPlan.getDeviceId(), insertPlan.getTime(),
- insertPlan.getMeasurements(), insertPlan.getValues()) <= 0) {
- throw new IOException("Inserting series data to IoTDB engine has failed.");
- }
+ insertExecutor.multiInsert(insertPlan);
}
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/LoadDataUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/LoadDataUtils.java
index db5b528..580987f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/LoadDataUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/LoadDataUtils.java
@@ -31,7 +31,8 @@ import java.util.List;
import java.util.Set;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.DatabaseEngine;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
@@ -57,7 +58,7 @@ public class LoadDataUtils {
private int writeInstanceThreshold;
private boolean hasExtra = false;
private long totalPointCount = 0;
- private FileNodeManager fileNodeManager;
+ private DatabaseEngine databaseEngine;
private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
/**
@@ -65,7 +66,7 @@ public class LoadDataUtils {
*/
public LoadDataUtils() {
writeInstanceMap = new HashSet<>();
- fileNodeManager = FileNodeManager.getInstance();
+ databaseEngine = DatabaseEngineFactory.getCurrent();
writeInstanceThreshold = conf.getWriteInstanceThreshold();
}
@@ -151,9 +152,9 @@ public class LoadDataUtils {
}
// appeared before, insert directly
try {
- fileNodeManager.insert(record, false);
+ databaseEngine.insert(record, false);
} catch (StorageGroupManagerException e) {
- logger.error("failed when insert into fileNodeManager, record:{}", line, e);
+ logger.error("failed when insert into databaseEngine, record:{}", line, e);
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
index c9621fe..8b2d66f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.utils;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.sgmanager.TsFileResource;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.query.context.QueryContext;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
index 75c5580..3e55df1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
@@ -31,7 +31,6 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.RecoverException;
@@ -285,7 +284,7 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
+ " logs failed to recover, see logs above for details");
}
try {
- FileNodeManager.getInstance().closeOneFileNode(writeLogNode.getFileNodeName());
+ DatabaseEngineFactory.getCurrent().closeOneFileNode(writeLogNode.getFileNodeName());
} catch (StorageGroupManagerException e) {
logger.error("Log node {} cannot perform flush after replaying logs! Because {}",
writeLogNode.getIdentifier(), e.getMessage());
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/FileNodeRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/FileNodeRecoverPerformer.java
index b41542d..033b63c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/FileNodeRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/FileNodeRecoverPerformer.java
@@ -17,8 +17,6 @@
* under the License.
*/
package org.apache.iotdb.db.writelog.recover;
-
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.RecoverException;
import org.slf4j.Logger;
@@ -42,7 +40,7 @@ public class FileNodeRecoverPerformer implements RecoverPerformer {
@Override
public void recover() throws RecoverException {
try {
- FileNodeManager.getInstance().recoverFileNode(getFileNodeName());
+ DatabaseEngineFactory.getCurrent().recoverFileNode(getFileNodeName());
} catch (StorageGroupManagerException e) {
logger.error("Cannot recover filenode {}", identifier);
throw new RecoverException(e);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
index 267ee39..9ea6ed3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
@@ -17,11 +17,10 @@
* under the License.
*/
package org.apache.iotdb.db.writelog.replay;
-
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
-import org.apache.iotdb.db.exception.StorageGroupManagerException;
+import org.apache.iotdb.db.engine.DatabaseEngineFactory;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -30,8 +29,6 @@ import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
public class ConcreteLogReplayer implements LogReplayer {
@@ -61,27 +58,14 @@ public class ConcreteLogReplayer implements LogReplayer {
}
private void multiInsert(InsertPlan insertPlan)
- throws PathErrorException, StorageGroupManagerException {
- String deviceId = insertPlan.getDeviceId();
- long insertTime = insertPlan.getTime();
- String[] measurementList = insertPlan.getMeasurements();
- String[] insertValues = insertPlan.getValues();
-
- TSRecord tsRecord = new TSRecord(insertTime, deviceId);
- for (int i = 0; i < measurementList.length; i++) {
- String pathKey = deviceId + "." + measurementList[i];
- TSDataType dataType = MManager.getInstance().getSeriesType(pathKey);
- String value = insertValues[i];
- DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementList[i], value);
- tsRecord.addTuple(dataPoint);
- }
- FileNodeManager.getInstance().insert(tsRecord, true);
+ throws StorageGroupManagerException {
+ DatabaseEngineFactory.getCurrent().insert(insertPlan, true);
}
private void update(UpdatePlan updatePlan) throws StorageGroupManagerException, PathErrorException {
TSDataType dataType = MManager.getInstance().getSeriesType(updatePlan.getPath().getFullPath());
for (Pair<Long, Long> timePair : updatePlan.getIntervals()) {
- FileNodeManager.getInstance().update(updatePlan.getPath().getDevice(),
+ DatabaseEngineFactory.getCurrent().update(updatePlan.getPath().getDevice(),
updatePlan.getPath().getMeasurement(), timePair.left, timePair.right, dataType,
updatePlan.getValue());
}
@@ -90,10 +74,10 @@ public class ConcreteLogReplayer implements LogReplayer {
private void delete(DeletePlan deletePlan, boolean isOverflow) throws StorageGroupManagerException {
for (Path path : deletePlan.getPaths()) {
if (isOverflow) {
- FileNodeManager.getInstance().deleteOverflow(path.getDevice(), path.getMeasurement(),
+ DatabaseEngineFactory.getCurrent().deleteInOverflow(path.getDevice(), path.getMeasurement(),
deletePlan.getDeleteTime());
} else {
- FileNodeManager.getInstance().deleteBufferWrite(path.getDevice(), path.getMeasurement(),
+ DatabaseEngineFactory.getCurrent().deleteInSeqFile(path.getDevice(), path.getMeasurement(),
deletePlan.getDeleteTime());
}
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java
index 7c62ff7..d90eff0 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.EngingeConstants;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -75,22 +76,22 @@ public class BufferWriteBenchmark {
public static void main(String[] args) throws BufferWriteProcessorException, IOException {
before();
Map<String, Action> parameters = new HashMap<>();
- parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, new Action() {
+ parameters.put(EngingeConstants.BUFFERWRITE_FLUSH_ACTION, new Action() {
@Override
public void act() throws ActionException {
- System.out.println(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
+ System.out.println(EngingeConstants.BUFFERWRITE_FLUSH_ACTION);
}
});
- parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, new Action() {
+ parameters.put(EngingeConstants.BUFFERWRITE_CLOSE_ACTION, new Action() {
@Override
public void act() throws ActionException {
- System.out.println(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
+ System.out.println(EngingeConstants.BUFFERWRITE_CLOSE_ACTION);
}
});
- parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, new Action() {
+ parameters.put(EngingeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, new Action() {
@Override
public void act() throws ActionException {
- System.out.println(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
+ System.out.println(EngingeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
}
});
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
index 6299215..90a101b 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.engine.EngingeConstants;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
@@ -85,9 +86,9 @@ public class BufferWriteProcessorNewTest {
@Before
public void setUp() throws Exception {
- parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bfflushaction);
- parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bfcloseaction);
- parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
+ parameters.put(EngingeConstants.BUFFERWRITE_FLUSH_ACTION, bfflushaction);
+ parameters.put(EngingeConstants.BUFFERWRITE_CLOSE_ACTION, bfcloseaction);
+ parameters.put(EngingeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
MetadataManagerHelper.initMetadata();
EnvironmentUtils.envSetUp();
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
index 3956736..2ad558e 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.engine.EngingeConstants;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.PathUtils;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -96,9 +97,9 @@ public class BufferWriteProcessorTest {
@Before
public void setUp() throws Exception {
- parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bfflushaction);
- parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bfcloseaction);
- parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
+ parameters.put(EngingeConstants.BUFFERWRITE_FLUSH_ACTION, bfflushaction);
+ parameters.put(EngingeConstants.BUFFERWRITE_CLOSE_ACTION, bfcloseaction);
+ parameters.put(EngingeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
// origin value
groupSizeInByte = TsFileConf.groupSizeInByte;
// new value
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
index f2c393a..3054b05 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
import org.apache.iotdb.db.engine.memtable.MemTableTestUtils;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.engine.tsfiledata.RestorableTsFileIOWriter;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodev2/FileNodeManagerBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodev2/FileNodeManagerBenchmark.java
index 6e066af..951523c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodev2/FileNodeManagerBenchmark.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodev2/FileNodeManagerBenchmark.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.filenodev2;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.filenode.DatabaseEngine;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.MetadataArgsErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -36,7 +36,7 @@ import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
/**
* Bench The filenode manager with mul-thread and get its performance.
*/
-public class FileNodeManagerBenchmark {
+public class DatabaseEngineBenchmark {
private static int numOfWoker = 10;
private static int numOfDevice = 10;
@@ -113,7 +113,7 @@ public class FileNodeManagerBenchmark {
long time = RandomNum.getRandomLong(1, seed);
String deltaObject = devices[(int) (time % numOfDevice)];
TSRecord tsRecord = getRecord(deltaObject, time);
- FileNodeManager.getInstance().insert(tsRecord, true);
+ DatabaseEngine.getInstance().insert(tsRecord, true);
}
} catch (StorageGroupManagerException e) {
e.printStackTrace();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java
index ca9e59d..56a130a 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.db.engine.PathUtils;
import org.apache.iotdb.db.engine.bufferwrite.Action;
import org.apache.iotdb.db.engine.bufferwrite.ActionException;
import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
-import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.EngingeConstants;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -130,9 +130,9 @@ public class BufferwriteFileSizeControlTest {
new File(filename).delete();
Map<String, Action> parameters = new HashMap<>();
- parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bfflushaction);
- parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bfcloseaction);
- parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
+ parameters.put(EngingeConstants.BUFFERWRITE_FLUSH_ACTION, bfflushaction);
+ parameters.put(EngingeConstants.BUFFERWRITE_CLOSE_ACTION, bfcloseaction);
+ parameters.put(EngingeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
try {
processor = new BufferWriteProcessor(Directories.getInstance().getTsFolderForTest(), nsp,
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java
index a800a80..86a16b0 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.db.engine.PathUtils;
import org.apache.iotdb.db.engine.bufferwrite.Action;
import org.apache.iotdb.db.engine.bufferwrite.ActionException;
import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
-import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.EngingeConstants;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -130,9 +130,9 @@ public class BufferwriteMetaSizeControlTest {
new File(filename).delete();
Map<String, Action> parameters = new HashMap<>();
- parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bfflushaction);
- parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bfcloseaction);
- parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
+ parameters.put(EngingeConstants.BUFFERWRITE_FLUSH_ACTION, bfflushaction);
+ parameters.put(EngingeConstants.BUFFERWRITE_CLOSE_ACTION, bfcloseaction);
+ parameters.put(EngingeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
try {
processor = new BufferWriteProcessor(Directories.getInstance().getTsFolderForTest(), nsp,
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java
index ddf7a3b..72b4658 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.bufferwrite.Action;
import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.EngingeConstants;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
import org.apache.iotdb.db.exception.OverflowProcessorException;
@@ -78,14 +78,14 @@ public class OverflowFileSizeControlTest {
}
};
- private Action filenodemanagerbackupaction = new Action() {
+ private Action DatabaseEnginebackupaction = new Action() {
@Override
public void act() throws ActionException {
}
};
- private Action filenodemanagerflushaction = new Action() {
+ private Action DatabaseEngineflushaction = new Action() {
@Override
public void act() throws ActionException {
@@ -95,8 +95,8 @@ public class OverflowFileSizeControlTest {
@Before
public void setUp() throws Exception {
parameters = new HashMap<>();
- parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowflushaction);
- parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, filenodeflushaction);
+ parameters.put(EngingeConstants.OVERFLOW_FLUSH_ACTION, overflowflushaction);
+ parameters.put(EngingeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, filenodeflushaction);
overflowFileSize = dbConfig.getOverflowFileSizeThreshold();
groupSize = tsconfig.groupSizeInByte;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java
index 2334ee7..8235954 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.bufferwrite.Action;
import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.EngingeConstants;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
import org.apache.iotdb.db.exception.OverflowProcessorException;
@@ -78,14 +78,14 @@ public class OverflowMetaSizeControlTest {
}
};
- private Action filenodemanagerbackupaction = new Action() {
+ private Action DatabaseEnginebackupaction = new Action() {
@Override
public void act() throws ActionException {
}
};
- private Action filenodemanagerflushaction = new Action() {
+ private Action DatabaseEngineflushaction = new Action() {
@Override
public void act() throws ActionException {
@@ -95,8 +95,8 @@ public class OverflowMetaSizeControlTest {
@Before
public void setUp() throws Exception {
parameters = new HashMap<String, Action>();
- parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowflushaction);
- parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, filenodeflushaction);
+ parameters.put(EngingeConstants.OVERFLOW_FLUSH_ACTION, overflowflushaction);
+ parameters.put(EngingeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, filenodeflushaction);
overflowFileSize = dbConfig.getOverflowMetaSizeThreshold();
groupSize = tsconfig.groupSizeInByte;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index 47b0d89..2f22670 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -31,7 +31,7 @@ import java.util.Collections;
import java.util.Iterator;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.filenode.DatabaseEngine;
import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
@@ -77,7 +77,7 @@ public class DeletionFileNodeTest {
for (int i = 0; i < 10; i++) {
MManager.getInstance().addPathToMTree(processorName + "." + measurements[i], dataType,
encoding);
- FileNodeManager.getInstance()
+ DatabaseEngine.getInstance()
.addTimeSeries(new Path(processorName, measurements[i]), TSDataType.valueOf(dataType),
TSEncoding.valueOf(encoding), CompressionType.valueOf(TSFileConfig.compressor),
Collections.emptyMap());
@@ -98,13 +98,13 @@ public class DeletionFileNodeTest {
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
- FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[3], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[4], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 30);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName,
measurements[5]), null);
@@ -130,13 +130,13 @@ public class DeletionFileNodeTest {
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().closeAll();
+ DatabaseEngine.getInstance().closeAll();
- FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[4], 40);
- FileNodeManager.getInstance().delete(processorName, measurements[3], 30);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[4], 40);
+ DatabaseEngine.getInstance().delete(processorName, measurements[3], 30);
Modification[] realModifications = new Modification[]{
new Deletion(processorName + "." + measurements[5], 102, 50),
@@ -173,9 +173,9 @@ public class DeletionFileNodeTest {
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().closeAll();
+ DatabaseEngine.getInstance().closeAll();
// insert into Overflow
for (int i = 1; i <= 100; i++) {
@@ -183,13 +183,13 @@ public class DeletionFileNodeTest {
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
- FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[3], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[4], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 30);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName,
measurements[5]), null);
@@ -218,9 +218,9 @@ public class DeletionFileNodeTest {
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().closeAll();
+ DatabaseEngine.getInstance().closeAll();
// insert into Overflow
for (int i = 1; i <= 100; i++) {
@@ -228,13 +228,13 @@ public class DeletionFileNodeTest {
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().closeAll();
+ DatabaseEngine.getInstance().closeAll();
- FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[4], 40);
- FileNodeManager.getInstance().delete(processorName, measurements[3], 30);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[4], 40);
+ DatabaseEngine.getInstance().delete(processorName, measurements[3], 30);
Modification[] realModifications = new Modification[]{
new Deletion(processorName + "." + measurements[5], 103, 50),
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
index 70e2de4..21f00db 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
@@ -26,7 +26,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.filenode.DatabaseEngine;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.MetadataArgsErrorException;
@@ -72,7 +72,7 @@ public class DeletionQueryTest {
for (int i = 0; i < 10; i++) {
MManager.getInstance().addPathToMTree(processorName + "." + measurements[i], dataType,
encoding);
- FileNodeManager.getInstance()
+ DatabaseEngine.getInstance()
.addTimeSeries(new Path(processorName, measurements[i]), TSDataType.valueOf(dataType),
TSEncoding.valueOf(encoding), CompressionType.valueOf(TSFileConfig.compressor),
Collections.emptyMap());
@@ -93,13 +93,13 @@ public class DeletionQueryTest {
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
- FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[3], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[4], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 30);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
List<Path> pathList = new ArrayList<>();
pathList.add(new Path(processorName, measurements[3]));
@@ -124,13 +124,13 @@ public class DeletionQueryTest {
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().closeAll();
+ DatabaseEngine.getInstance().closeAll();
- FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[4], 40);
- FileNodeManager.getInstance().delete(processorName, measurements[3], 30);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[4], 40);
+ DatabaseEngine.getInstance().delete(processorName, measurements[3], 30);
List<Path> pathList = new ArrayList<>();
pathList.add(new Path(processorName, measurements[3]));
@@ -156,9 +156,9 @@ public class DeletionQueryTest {
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().closeAll();
+ DatabaseEngine.getInstance().closeAll();
// insert into Overflow
for (int i = 1; i <= 100; i++) {
@@ -166,13 +166,13 @@ public class DeletionQueryTest {
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
- FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[3], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[4], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 30);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
List<Path> pathList = new ArrayList<>();
pathList.add(new Path(processorName, measurements[3]));
@@ -198,9 +198,9 @@ public class DeletionQueryTest {
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().closeAll();
+ DatabaseEngine.getInstance().closeAll();
// insert into Overflow
for (int i = 1; i <= 100; i++) {
@@ -208,13 +208,13 @@ public class DeletionQueryTest {
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().closeAll();
+ DatabaseEngine.getInstance().closeAll();
- FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[4], 40);
- FileNodeManager.getInstance().delete(processorName, measurements[3], 30);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[4], 40);
+ DatabaseEngine.getInstance().delete(processorName, measurements[3], 30);
List<Path> pathList = new ArrayList<>();
pathList.add(new Path(processorName, measurements[3]));
@@ -240,45 +240,45 @@ public class DeletionQueryTest {
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
- FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[3], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[4], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 30);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
- FileNodeManager.getInstance().forceFlush(UsageLevel.DANGEROUS);
+ DatabaseEngine.getInstance().forceFlush(UsageLevel.DANGEROUS);
for (int i = 101; i <= 200; i++) {
TSRecord record = new TSRecord(i, processorName);
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().delete(processorName, measurements[3], 250);
- FileNodeManager.getInstance().delete(processorName, measurements[4], 250);
- FileNodeManager.getInstance().delete(processorName, measurements[5], 230);
- FileNodeManager.getInstance().delete(processorName, measurements[5], 250);
+ DatabaseEngine.getInstance().delete(processorName, measurements[3], 250);
+ DatabaseEngine.getInstance().delete(processorName, measurements[4], 250);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 230);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 250);
- FileNodeManager.getInstance().forceFlush(UsageLevel.DANGEROUS);
+ DatabaseEngine.getInstance().forceFlush(UsageLevel.DANGEROUS);
for (int i = 201; i <= 300; i++) {
TSRecord record = new TSRecord(i, processorName);
for (int j = 0; j < 10; j++) {
record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
}
- FileNodeManager.getInstance().insert(record, false);
+ DatabaseEngine.getInstance().insert(record, false);
}
- FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
- FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
- FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[3], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[4], 50);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 30);
+ DatabaseEngine.getInstance().delete(processorName, measurements[5], 50);
- FileNodeManager.getInstance().closeAll();
+ DatabaseEngine.getInstance().closeAll();
List<Path> pathList = new ArrayList<>();
pathList.add(new Path(processorName, measurements[3]));
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java
index 4fac693..8942e7c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.bufferwrite.Action;
import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.EngingeConstants;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.OverflowProcessorException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -82,16 +82,16 @@ public class OverflowProcessorBenchmark {
public static void main(String[] args) throws IOException, OverflowProcessorException {
Map<String, Action> parameters = new HashMap<>();
- parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, new Action() {
+ parameters.put(EngingeConstants.OVERFLOW_FLUSH_ACTION, new Action() {
@Override
public void act() throws ActionException {
- System.out.println(FileNodeConstants.OVERFLOW_FLUSH_ACTION);
+ System.out.println(EngingeConstants.OVERFLOW_FLUSH_ACTION);
}
});
- parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, new Action() {
+ parameters.put(EngingeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, new Action() {
@Override
public void act() throws ActionException {
- System.out.println(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
+ System.out.println(EngingeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
}
});
OverflowProcessor overflowProcessor = new OverflowProcessor("Overflow_bench", parameters,
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
index 9877f76..b366c21 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.iotdb.db.engine.PathUtils;
import org.apache.iotdb.db.engine.bufferwrite.Action;
import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.EngingeConstants;
import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
@@ -67,8 +67,8 @@ public class OverflowProcessorTest {
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
parameters = new HashMap<String, Action>();
- parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowflushaction);
- parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, filenodeflushaction);
+ parameters.put(EngingeConstants.OVERFLOW_FLUSH_ACTION, overflowflushaction);
+ parameters.put(EngingeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, filenodeflushaction);
}
@After
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/QueryDataFromUnclosedTsFileIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/QueryDataFromUnclosedTsFileIT.java
index cd470c9..d701c7a 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/QueryDataFromUnclosedTsFileIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/QueryDataFromUnclosedTsFileIT.java
@@ -25,7 +25,7 @@ import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID;
import java.io.IOException;
import java.util.Collections;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.filenode.DatabaseEngine;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.MetadataArgsErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -50,7 +50,7 @@ import org.junit.Test;
public class QueryDataFromUnclosedTsFileIT {
long bufferWriteFileSize;
- FileNodeManager sgManager;
+ DatabaseEngine sgManager;
MManager mManager;
EngineQueryRouter queryManager;
@Before
@@ -61,7 +61,7 @@ public class QueryDataFromUnclosedTsFileIT {
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
bufferWriteFileSize = IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold();
//IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(100);
- sgManager = FileNodeManager.getInstance();
+ sgManager = DatabaseEngine.getInstance();
mManager = MManager.getInstance();
queryManager = new EngineQueryRouter();
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/monitor/MonitorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/monitor/MonitorTest.java
index 5fa1728..f02381d 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/monitor/MonitorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/monitor/MonitorTest.java
@@ -28,7 +28,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.filenode.DatabaseEngine;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.MonitorConstants.FileSizeConstants;
@@ -63,22 +63,22 @@ public class MonitorTest {
}
@Test
- public void testFileNodeManagerMonitorAndAddMetadata() {
- FileNodeManager fManager = FileNodeManager.getInstance();
+ public void testDatabaseEngineMonitorAndAddMetadata() {
+ DatabaseEngine dbEngine = DatabaseEngine.getInstance();
FileSize fileSize = FileSize.getInstance();
statMonitor = StatMonitor.getInstance();
statMonitor.registerStatStorageGroup();
- fManager.getStatParamsHashMap().forEach((key, value) -> value.set(0));
+ dbEngine.getStatParamsHashMap().forEach((key, value) -> value.set(0));
fileSize.getStatParamsHashMap().forEach((key, value) -> value.set(0));
statMonitor.clearIStatisticMap();
- statMonitor.registerStatistics(fManager.getClass().getSimpleName(), fManager);
+ statMonitor.registerStatistics(dbEngine.getClass().getSimpleName(), dbEngine);
statMonitor
.registerStatistics(MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME, FileSize.getInstance());
// add metadata
MManager mManager = MManager.getInstance();
- fManager.registerStatMetadata();
+ dbEngine.registerStatMetadata();
fileSize.registerStatMetadata();
- Map<String, AtomicLong> statParamsHashMap = fManager.getStatParamsHashMap();
+ Map<String, AtomicLong> statParamsHashMap = dbEngine.getStatParamsHashMap();
Map<String, AtomicLong> fileSizeStatsHashMap = fileSize.getStatParamsHashMap();
for (String statParam : statParamsHashMap.keySet()) {
assertTrue(mManager.pathExist(
@@ -103,10 +103,10 @@ public class MonitorTest {
// Get stat data and test right
- Map<String, TSRecord> statHashMap = fManager.getAllStatisticsValue();
+ Map<String, TSRecord> statHashMap = dbEngine.getAllStatisticsValue();
Map<String, TSRecord> fileSizeStatMap = fileSize.getAllStatisticsValue();
- String path = fManager.getAllPathForStatistic().get(0);
+ String path = dbEngine.getAllPathForStatistic().get(0);
String fileSizeStatPath = fileSize.getAllPathForStatistic().get(0);
int pos = path.lastIndexOf('.');
int fileSizeStatPos = fileSizeStatPath.lastIndexOf('.');
@@ -138,7 +138,7 @@ public class MonitorTest {
}
try {
- fManager.deleteAll();
+ dbEngine.deleteAll();
} catch (StorageGroupManagerException e) {
e.printStackTrace();
fail(e.getMessage());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 3f9bffd..0c28697 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.cache.RowGroupBlockMetaDataCache;
import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.filenode.DatabaseEngine;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.StartupException;
@@ -73,7 +73,7 @@ public class EnvironmentUtils {
// tsFileConfig.duplicateIncompletedPage = false;
// clean filenode manager
try {
- if (!FileNodeManager.getInstance().deleteAll()) {
+ if (!DatabaseEngine.getInstance().deleteAll()) {
LOGGER.error("Can't close the filenode manager in EnvironmentUtils");
Assert.fail();
}
@@ -81,7 +81,7 @@ public class EnvironmentUtils {
throw new IOException(e);
}
StatMonitor.getInstance().close();
- FileNodeManager.getInstance().resetFileNodeManager();
+ DatabaseEngine.getInstance().resetDatabaseEngine();
// clean wal
MultiFileLogNodeManager.getInstance().stop();
// clean cache
@@ -92,7 +92,7 @@ public class EnvironmentUtils {
MManager.getInstance().flushObjectToFile();
// delete all directory
cleanAllDir();
- // FileNodeManager.getInstance().reset();
+ // DatabaseEngine.getInstance().reset();
// reset MemController
BasicMemController.getInstance().close();
}
@@ -168,7 +168,7 @@ public class EnvironmentUtils {
} catch (AuthException e) {
throw new StartupException(e.getMessage());
}
- FileNodeManager.getInstance().resetFileNodeManager();
+ DatabaseEngine.getInstance().resetDatabaseEngine();
MultiFileLogNodeManager.getInstance().start();
TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);