You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/29 09:28:23 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: add
PrimitiveDataListPool
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new f56f82b add PrimitiveDataListPool
f56f82b is described below
commit f56f82bb0fd1f092be41f3f2fca0e34f83c4abd3
Author: qiaojialin <64...@qq.com>
AuthorDate: Sat Jun 29 17:27:22 2019 +0800
add PrimitiveDataListPool
---
.../java/org/apache/iotdb/db/engine/Processor.java | 217 -------
.../apache/iotdb/db/engine/bufferwrite/Action.java | 28 -
.../db/engine/bufferwrite/ActionException.java | 30 -
.../engine/bufferwrite/BufferWriteProcessor.java | 695 ---------------------
.../db/engine/bufferwrite/FileNodeConstants.java | 44 --
.../bufferwrite/RestorableTsFileIOWriter.java | 328 ----------
.../db/engine/filenode/FileNodeProcessor.java | 4 +-
.../db/engine/filenode/FileNodeProcessorStore.java | 165 -----
.../db/engine/filenode/OverflowChangeType.java | 57 --
.../iotdb/db/engine/filenode/TsFileResource.java | 397 ------------
.../db/engine/filenodeV2/FileNodeManagerV2.java | 8 +-
.../db/engine/filenodeV2/TsFileResourceV2.java | 9 +
.../filenodeV2/UnsealedTsFileProcessorV2.java | 8 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 2 +-
.../db/engine/memtable/DeduplicatedSortedData.java | 65 --
.../db/engine/memtable/IWritableMemChunk.java | 7 -
.../db/engine/memtable/MemTableFlushCallBack.java | 27 -
.../db/engine/memtable/MemTableFlushTask.java | 272 --------
.../db/engine/memtable/MemTableFlushUtil.java | 119 ----
.../iotdb/db/engine/memtable/WritableMemChunk.java | 194 ------
.../iotdb/db/engine/overflow/io/OverflowIO.java | 174 ------
.../db/engine/overflow/io/OverflowResource.java | 372 -----------
.../overflow/io/OverflowedTsFileIOWriter.java | 36 --
.../engine/overflow/metadata/OFFileMetadata.java | 107 ----
.../overflow/metadata/OFRowGroupListMetadata.java | 109 ----
.../overflow/metadata/OFSeriesListMetadata.java | 107 ----
.../db/engine/overflow/utils/MergeStatus.java | 29 -
.../db/engine/overflow/utils/OverflowOpType.java | 29 -
.../querycontext/GlobalSortedSeriesDataSource.java | 10 +-
.../db/engine/querycontext/QueryDataSource.java | 42 --
.../iotdb/db/query/control/JobFileManager.java | 24 -
.../db/query/control/QueryResourceManager.java | 1 -
.../iotdb/db/rescon/PrimitiveDataListPool.java | 167 +++++
.../iotdb/db/sync/receiver/SyncServiceImpl.java | 9 +-
.../org/apache/iotdb/db/tools/TsFileChecker.java | 329 ----------
.../apache/iotdb/db/utils/PrimitiveArrayList.java | 149 -----
.../iotdb/db/utils/PrimitiveArrayListFactory.java | 48 --
.../iotdb/db/utils/PrimitiveArrayListV2.java | 143 -----
.../iotdb/db/utils/PrimitiveDataListPool.java | 111 ----
.../iotdb/db/utils/datastructure/BinaryTVList.java | 63 +-
.../db/utils/datastructure/BooleanTVList.java | 63 +-
.../iotdb/db/utils/datastructure/DoubleTVList.java | 63 +-
.../iotdb/db/utils/datastructure/FloatTVList.java | 63 +-
.../iotdb/db/utils/datastructure/IntTVList.java | 63 +-
.../iotdb/db/utils/datastructure/LongTVList.java | 63 +-
.../iotdb/db/utils/datastructure/TVList.java | 57 +-
.../db/utils/datastructure/TVListAllocator.java | 4 +-
.../memcontrol/BufferwriteFileSizeControlTest.java | 167 -----
.../memcontrol/BufferwriteMetaSizeControlTest.java | 165 -----
.../db/engine/memtable/PrimitiveMemTableTest.java | 3 +-
.../iotdb/db/integration/IoTDBDeletionIT.java | 3 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 4 +-
.../iotdb/db/utils/PrimitiveArrayListV2Test.java | 100 ---
.../iotdb/db/utils/PrimitiveDataListPoolTest.java | 56 --
.../db/utils/datastructure/LongTVListTest.java | 39 --
.../iotdb/db/writelog/recover/LogReplayerTest.java | 2 -
56 files changed, 522 insertions(+), 5128 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
deleted file mode 100644
index eb0028a..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine;
-
-import java.io.IOException;
-import java.util.concurrent.Future;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Processor is used for implementing different processor with different operation.<br>
- *
- * @see BufferWriteProcessor
- */
-@Deprecated
-public abstract class Processor {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class);
- private final ReadWriteLock lock;
-// private long start;
- protected String processorName;
-
- /**
- * Construct processor using name space seriesPath
- *
- * @param processorName
- */
- public Processor(String processorName) {
- this.processorName = processorName;
- this.lock = new ReentrantReadWriteLock();
- }
-
- /**
- * Release the read lock
- */
- public void readUnlock() {
- lock.readLock().unlock();
-// start = System.currentTimeMillis() - start;
-// if (start > 1000) {
-// LOGGER.info("Processor {} hold lock for {}ms", processorName, start, new RuntimeException());
-// }
- }
-
- /**
- * Acquire the read lock
- */
- public void readLock() {
- lock.readLock().lock();
-// start = System.currentTimeMillis();
- }
-
- /**
- * Acquire the insert lock
- */
- public void writeLock() {
- lock.writeLock().lock();
-// start = System.currentTimeMillis();
- }
-
- /**
- * Release the insert lock
- */
- public void writeUnlock() {
-// start = System.currentTimeMillis() - start;
-// if (start > 1000) {
-// LOGGER.info("Processor {} hold lock for {}ms", processorName, start, new RuntimeException());
-// }
- lock.writeLock().unlock();
- }
-
- /**
- * @param isWriteLock
- * true acquire insert lock, false acquire read lock
- */
- public void lock(boolean isWriteLock) {
- if (isWriteLock) {
- lock.writeLock().lock();
- } else {
- lock.readLock().lock();
- }
-// start = System.currentTimeMillis();
- }
-
- public boolean tryLock(boolean isWriteLock) {
- if (isWriteLock) {
- return tryWriteLock();
- } else {
- return tryReadLock();
- }
- }
-
- /**
- * @param isWriteUnlock
- * true putBack insert lock, false putBack read unlock
- */
- public void unlock(boolean isWriteUnlock) {
-// start = System.currentTimeMillis() - start;
-// if (start > 1000) {
-// LOGGER.info("Processor {} hold lock for {}ms", processorName, start, new RuntimeException());
-// }
- if (isWriteUnlock) {
- writeUnlock();
- } else {
- readUnlock();
- }
- }
-
- /**
- * Get the name space seriesPath
- *
- * @return
- */
- public String getProcessorName() {
- return processorName;
- }
-
- /**
- * Try to get the insert lock
- *
- * @return
- */
- public boolean tryWriteLock() {
- boolean result = lock.writeLock().tryLock();
-// if (result) {
-// start = System.currentTimeMillis();
-// }
- return result;
- }
-
- /**
- * Try to get the read lock
- *
- * @return
- */
- public boolean tryReadLock() {
- boolean result = lock.readLock().tryLock();
-// if (result) {
-// start = System.currentTimeMillis();
-// }
- return result;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((processorName == null) ? 0 : processorName.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- Processor other = (Processor) obj;
- if (processorName == null) {
- if (other.processorName != null) {
- return false;
- }
- } else if (!processorName.equals(other.processorName)) {
- return false;
- }
- return true;
- }
-
- /**
- * Judge whether this processor can be closed.
- *
- * @return true if subclass doesn't have other implementation.
- */
- public abstract boolean canBeClosed();
-
- /**
- * call flush operation asynchronously
- * @return a future that returns true if successfully, otherwise false.
- * @throws IOException
- */
- public abstract Future<Boolean> flush() throws IOException;
-
- /**
- * Close the processor.<br>
- * Notice: Thread is not safe
- *
- * @throws IOException
- * @throws ProcessorException
- */
- public abstract void close() throws ProcessorException;
-
- public abstract long memoryUsage();
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/Action.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/Action.java
deleted file mode 100644
index af65a8a..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/Action.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.bufferwrite;
-
-/**
- * Action interface.
- */
-
-public interface Action {
-
- void act() throws ActionException;
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/ActionException.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/ActionException.java
deleted file mode 100644
index 1d76d9f..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/ActionException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.bufferwrite;
-
-public class ActionException extends Exception{
-
- public ActionException(String message) {
- super(message);
- }
-
- public ActionException(Throwable cause) {
- super(cause);
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
deleted file mode 100644
index 99e4169..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ /dev/null
@@ -1,695 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.bufferwrite;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.Processor;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
-import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
-import org.apache.iotdb.db.engine.memtable.MemTablePool;
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.pool.FlushPoolManager;
-import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.monitor.collector.MemTableWriteTimeCost;
-import org.apache.iotdb.db.monitor.collector.MemTableWriteTimeCost.MemTableWriteTimeCostType;
-import org.apache.iotdb.db.qp.constant.DatetimeUtils;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.utils.ImmediateFuture;
-import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated
-public class BufferWriteProcessor extends Processor {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessor.class);
- private RestorableTsFileIOWriter writer;
- private FileSchema fileSchema;
- private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
- private volatile Future<Boolean> closeFuture = new BWCloseFuture(new ImmediateFuture<>(true));
-// private ReentrantLock flushQueryLock = new ReentrantLock();
- private AtomicLong memSize = new AtomicLong();
- // do not use TsFileConfig.groupSizeInByte, it will ignore the config file
- private long memThreshold = TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
- private IMemTable workMemTable;
-
- // each flush task has a flushId, IO task should scheduled by this id
- private long flushId = -1;
- private List<IMemTable> flushingMemTables = new ArrayList<>();
-
- private Action bufferwriteFlushAction;
- //private Action bufferwriteCloseAction;
- private Consumer<BufferWriteProcessor> bufferwriteCloseConsumer;
- private Action filenodeFlushAction;
-
- //lastFlushTime time unit: millisecond
- private long lastFlushTime = -1;
- private long valueCount = 0;
-
- private String baseDir;
- private String insertFilePath;
- private String bufferWriteRelativePath;
-
- private WriteLogNode logNode;
- private VersionController versionController;
-
- private boolean isClosed = false;
-
- private TsFileResource currentTsFileResource;
-
- private Lock flushQueryLock = new ReentrantLock();
-
- /**
- * constructor of BufferWriteProcessor.
- *
- * @param baseDir base dir
- * @param processorName processor name
- * @param fileName file name
- * @param parameters parameters in Map(String, Action) structure
- * @param fileSchema file schema
- * @throws BufferWriteProcessorException BufferWriteProcessorException
- */
- public BufferWriteProcessor(String baseDir, String processorName, String fileName,
- Map<String, Action> parameters, Consumer<BufferWriteProcessor> bufferwriteCloseConsumer,
- VersionController versionController,
- FileSchema fileSchema) throws BufferWriteProcessorException {
- this(baseDir, processorName, fileName, parameters, bufferwriteCloseConsumer, versionController
- , fileSchema, null);
- }
-
- public BufferWriteProcessor(String baseDir, String processorName, String fileName,
- Map<String, Action> parameters, Consumer<BufferWriteProcessor> bufferwriteCloseConsumer,
- VersionController versionController,
- FileSchema fileSchema, TsFileResource tsFileResource) throws BufferWriteProcessorException {
- super(processorName);
- this.fileSchema = fileSchema;
- this.baseDir = baseDir;
- this.currentTsFileResource = tsFileResource;
-
- bufferwriteFlushAction = parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
- //bufferwriteCloseAction = parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
- filenodeFlushAction = parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
- this.bufferwriteCloseConsumer = bufferwriteCloseConsumer;
-
- new File(baseDir, processorName).mkdirs();
- this.insertFilePath = Paths.get(baseDir, processorName, fileName).toString();
- bufferWriteRelativePath = processorName + File.separatorChar + fileName;
-
- open();
- try {
- getLogNode();
- } catch (IOException e) {
- throw new BufferWriteProcessorException(e);
- }
- this.versionController = versionController;
- }
-
- private void open() throws BufferWriteProcessorException {
- try {
- writer = new RestorableTsFileIOWriter(processorName, insertFilePath);
- } catch (IOException e) {
- throw new BufferWriteProcessorException(e);
- }
- long start1 = System.currentTimeMillis();
- workMemTable = MemTablePool.getInstance().getEmptyMemTable(this);
- start1 = System.currentTimeMillis() - start1;
- if (start1 > 1000) {
- LOGGER.info("BufferWriteProcessor.open getEmptyMemtable cost: {}", start1);
- }
- }
-
- /**
- * Only for Test
- *
- * insert one data point to the buffer insert.
- *
- * @param deviceId device name
- * @param measurementId sensor name
- * @param timestamp timestamp of the data point
- * @param dataType the data type of the value
- * @param value data point value
- * @return true -the size of tsfile or metadata reaches to the threshold. false -otherwise
- * @throws BufferWriteProcessorException if a flushing operation occurs and failed.
- */
- public boolean write(String deviceId, String measurementId, long timestamp, TSDataType dataType,
- String value)
- throws BufferWriteProcessorException {
- TSRecord record = new TSRecord(timestamp, deviceId);
- DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, value);
- record.addTuple(dataPoint);
- return write(record);
- }
-
- /**
- * wrete a ts record into the memtable. If the memory usage is beyond the memThreshold, an async
- * flushing operation will be called.
- *
- * @param tsRecord data to be written
- * @return FIXME what is the mean about the return value??
- * @throws BufferWriteProcessorException if a flushing operation occurs and failed.
- */
- public boolean write(TSRecord tsRecord) throws BufferWriteProcessorException {
- MemTableWriteTimeCost.getInstance().init();
- long start1 = System.currentTimeMillis();
- long memUsage = MemUtils.getRecordSize(new InsertPlan(tsRecord));
- BasicMemController.UsageLevel level = BasicMemController.getInstance()
- .acquireUsage(this, memUsage);
-
- start1 = System.currentTimeMillis() - start1;
- if (start1 > 1000) {
- LOGGER.info("BufferWriteProcessor.insert step1 cost: {}", start1);
- }
-
- String memory;
- long start2 = System.currentTimeMillis();
- switch (level) {
- case SAFE:
- for (DataPoint dataPoint : tsRecord.dataPointList) {
- workMemTable.write(tsRecord.deviceId, dataPoint.getMeasurementId(), dataPoint.getType(),
- tsRecord.time,
- dataPoint.getValue().toString());
- }
- valueCount++;
- start2 = System.currentTimeMillis() - start2;
- if (start2 > 1000) {
- LOGGER.info("BufferWriteProcessor.insert step2 of SAFE cost: {}", start2);
- Map<MemTableWriteTimeCostType, long[]> map = MemTableWriteTimeCost.getInstance().getTimeCostMaps().get(Thread.currentThread().getName());
- for(MemTableWriteTimeCostType type: MemTableWriteTimeCostType.values()){
- LOGGER.info("In BufferWriteProcessor.insert step2 of SAFE, {} cost {} ms, execute {} times", type, map.get(type)[1], map.get(type)[0]);
- }
- }
- checkMemThreshold4Flush(memUsage);
- return true;
- case WARNING:
- memory = MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage());
- LOGGER.warn("Memory usage will exceed warning threshold, current : {}.", memory);
- for (DataPoint dataPoint : tsRecord.dataPointList) {
- workMemTable.write(tsRecord.deviceId, dataPoint.getMeasurementId(), dataPoint.getType(),
- tsRecord.time,
- dataPoint.getValue().toString());
- }
- valueCount++;
- start2 = System.currentTimeMillis() - start2;
- if (start2 > 1000) {
- LOGGER.info("BufferWriteProcessor.insert step2 of WARNING cost: {}", start2);
- Map<MemTableWriteTimeCostType, long[]> map = MemTableWriteTimeCost.getInstance().getTimeCostMaps().get(Thread.currentThread().getName());
- for(MemTableWriteTimeCostType type: MemTableWriteTimeCostType.values()){
- LOGGER.info("In BufferWriteProcessor.insert step2 of WARNING, {} cost {} ms, execute {} times", type, map.get(type)[1], map.get(type)[0]);
- }
- }
- try {
- flush();
- } catch (IOException e) {
- throw new BufferWriteProcessorException(e);
- }
- return true;
- case DANGEROUS:
- default:
- memory = MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage());
- LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.", memory);
- return false;
- }
- }
-
- private void checkMemThreshold4Flush(long addedMemory) throws BufferWriteProcessorException {
- long start1 = System.currentTimeMillis();
- long newMem = memSize.addAndGet(addedMemory);
- if (newMem > memThreshold) {
- String usageMem = MemUtils.bytesCntToStr(newMem);
- String threshold = MemUtils.bytesCntToStr(memThreshold);
- String processorName = getProcessorName();
- LOGGER.info("The usage of memory {} in bufferwrite processor {} reaches the threshold {}",
- usageMem, processorName, threshold);
- try {
- flush();
- } catch (IOException e) {
- LOGGER.error("Flush bufferwrite error.", e);
- throw new BufferWriteProcessorException(e);
- }
- }
- start1 = System.currentTimeMillis() - start1;
- if (start1 > 1000) {
- LOGGER.info("BufferWriteProcessor.checkMemThreshold4Flush step-1, cost: {}", start1);
- }
- }
-
- /**
- * get the one (or two) chunk(s) in the memtable ( and the other one in flushing status and then
- * compact them into one TimeValuePairSorter). Then get its (or their) ChunkMetadata(s).
- *
- * @param deviceId device id
- * @param measurementId sensor id
- * @param dataType data type
- * @return corresponding chunk data and chunk metadata in memory
- */
- public Pair<ReadOnlyMemChunk, List<ChunkMetaData>> queryBufferWriteData(String deviceId,
- String measurementId, TSDataType dataType, Map<String, String> props) {
- flushQueryLock.lock();
- try {
- MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
- if (flushingMemTables != null) {
- for (int i = flushingMemTables.size() - 1; i >= 0; i--) {
- memSeriesLazyMerger.addMemSeries(flushingMemTables.get(i).query(deviceId, measurementId, dataType, props));
- }
- }
- if (workMemTable != null) {
- memSeriesLazyMerger.addMemSeries(workMemTable.query(deviceId, measurementId, dataType, props));
- }
- // memSeriesLazyMerger has handled the props,
- // so we do not need to handle it again in the following readOnlyMemChunk
- ReadOnlyMemChunk timeValuePairSorter = new ReadOnlyMemChunk(dataType, memSeriesLazyMerger,
- Collections.emptyMap());
- return new Pair<>(timeValuePairSorter,
- writer.getMetadatas(deviceId, measurementId, dataType));
- } finally {
- flushQueryLock.unlock();
- }
- }
-
-
- /**
- * return the memtable to MemTablePool and make
- * @param memTable
- */
- private void removeFlushedMemTable(IMemTable memTable, TsFileIOWriter tsFileIOWriter) {
- long start = System.currentTimeMillis();
- this.writeLock();
- ((RestorableTsFileIOWriter) tsFileIOWriter).makeMetadataVisible();
- try {
- flushingMemTables.remove(memTable);
- } finally {
- this.writeUnlock();
- }
- start = System.currentTimeMillis() - start;
- if (start > 1000) {
- LOGGER.info("removeFlushedMemTable is too slow!!! cost: {}ms", start);
- }
- }
-
-
- /**
- * the caller mast guarantee no other concurrent caller entering this function.
- *
- * @param displayMessage message that will appear in system log.
- * @param tmpMemTableToFlush
- * @param version the operation version that will tagged on the to be flushed memtable
- * (i.e., ChunkGroup)
- * @return true if successfully.
- */
- private boolean flushTask(String displayMessage,
- IMemTable tmpMemTableToFlush, long version, long flushId) {
- boolean result;
- long flushStartTime = System.currentTimeMillis();
- LOGGER.info("The bufferwrite processor {} starts flushing {}.", getProcessorName(),
- displayMessage);
- try {
- if (tmpMemTableToFlush != null && !tmpMemTableToFlush.isEmpty()) {
- // flush data
- MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, getProcessorName(), flushId,
- this::removeFlushedMemTable);
- tableFlushTask.flushMemTable(fileSchema, tmpMemTableToFlush, version);
- }
-
- filenodeFlushAction.act();
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- logNode.notifyEndFlush();
- }
- result = true;
- } catch (Exception e) {
- LOGGER.error("The bufferwrite processor {} failed to flush {}.", getProcessorName(), displayMessage, e);
- result = false;
- }
-
- if (LOGGER.isInfoEnabled()) {
- long flushEndTime = System.currentTimeMillis();
- LOGGER.info(
- "The bufferwrite processor {} flush {}, start time is {}, flush end time is {}, "
- + "flush time consumption is {}ms",
- getProcessorName(), displayMessage,
- DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime),
- DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime),
- flushEndTime - flushStartTime);
- }
- return result;
- }
-
- // keyword synchronized is added in this method, so that only one flush task can be submitted now.
- @Override
- public synchronized Future<Boolean> flush() throws IOException {
- return flush(false);
- }
-
- // keyword synchronized is added in this method, so that only one flush task can be submitted now.
- private Future<Boolean> flush(boolean isCloseTaskCalled) throws IOException {
- // statistic information for flush
- if (lastFlushTime > 0) {
- if (LOGGER.isInfoEnabled()) {
- long thisFlushTime = System.currentTimeMillis();
- LOGGER.info(
- "The bufferwrite processor {} will submit a flush task."
- + "The last flush time is {}, this flush time is {}, "
- + "flush time interval is {}s", getProcessorName(),
- DatetimeUtils.convertMillsecondToZonedDateTime(lastFlushTime),
- DatetimeUtils.convertMillsecondToZonedDateTime(thisFlushTime),
- (thisFlushTime - lastFlushTime) / 1000);
- }
- }
- lastFlushTime = System.currentTimeMillis();
- // check value count
- if (valueCount > 0) {
- // update the lastUpdatetime, prepare for flush
- try {
- bufferwriteFlushAction.act();
- } catch (Exception e) {
- LOGGER.error("Failed to flush bufferwrite row group when calling the action function.");
- throw new IOException(e);
- }
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- logNode.notifyStartFlush();
- LOGGER.info("BufferWrite Processor {} has notified WAL for flushing.", getProcessorName());
- }
- valueCount = 0;
-
- flushingMemTables.add(workMemTable);
- IMemTable tmpMemTableToFlush = workMemTable;
-
- long start = System.currentTimeMillis();
-
- start = System.currentTimeMillis() - start;
- if (start > 1000) {
- LOGGER.info("BufferWriteProcessor.flush getEmptyMemtable cost: {}", start);
- }
-
- flushId++;
- long version = versionController.nextVersion();
- BasicMemController.getInstance().releaseUsage(this, memSize.get());
- memSize.set(0);
- // switch
- if (isCloseTaskCalled) {
- LOGGER.info(
- "flush memtable for bufferwrite processor {} synchronously for close task.",
- getProcessorName(), FlushPoolManager.getInstance().getWaitingTasksNumber(),
- FlushPoolManager.getInstance().getCorePoolSize());
- flushTask("synchronously", tmpMemTableToFlush, version, flushId);
- flushFuture = new ImmediateFuture<>(true);
- } else {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info(
- "Begin to submit flush task for bufferwrite processor {}, current Flush Queue is {}, core pool size is {}.",
- getProcessorName(), FlushPoolManager.getInstance().getWaitingTasksNumber(),
- FlushPoolManager.getInstance().getCorePoolSize());
- }
- flushFuture = FlushPoolManager.getInstance().submit(() -> flushTask("asynchronously",
- tmpMemTableToFlush, version, flushId));
- }
-
- if (isCloseTaskCalled) {
- workMemTable = null;
- } else {
- workMemTable = MemTablePool.getInstance().getEmptyMemTable(this);
- }
- } else {
- if (isCloseTaskCalled) {
- MemTablePool.getInstance().putBack(workMemTable);
- }
- flushFuture = new ImmediateFuture<>(true);
- }
- return flushFuture;
- }
-
- @Override
- public boolean canBeClosed() {
- return true;
- }
-
- @Override
- public synchronized void close() throws BufferWriteProcessorException {
- if (writer == null) {
- return;
- }
- try {
- // flush data (if there are flushing task, flush() will be blocked) and wait for finishing flush async
- LOGGER.info("Submit a BufferWrite ({}) close task.", getProcessorName());
- closeFuture = new BWCloseFuture(FlushPoolManager.getInstance().submit(() -> closeTask()));
- //now, we omit the future of the closeTask.
- } catch (Exception e) {
- LOGGER
- .error("Failed to close the bufferwrite processor when calling the action function.", e);
- throw new BufferWriteProcessorException(e);
- }
- }
-
- private boolean closeTask() {
- long closeStartTime = System.currentTimeMillis();
- try {
- LOGGER.info("Bufferwrite {} Close Task: begin to wait for the flush.", getProcessorName());
- flush(true);
- LOGGER.info("Bufferwrite {} Close Task: finishing the flush.", getProcessorName());
- // end file
- writer.endFile(fileSchema);
- //FIXME suppose the flush-thread-pool is 2.
- // then if a flush task and a close task are running in the same time
- // and the close task is faster, then writer == null, and the flush task will throw nullpointer
- // exception. Add "synchronized" keyword on both flush and close may solve the issue.
- writer = null;
- // update the IntervalFile for interval list
- bufferwriteCloseConsumer.accept(this);
- // flush the changed information for filenode
- filenodeFlushAction.act();
- // delete the restore for this bufferwrite processor
- if (LOGGER.isInfoEnabled()) {
- long closeEndTime = System.currentTimeMillis();
- LOGGER.info(
- "Close bufferwrite processor {}, the file name is {}, start time is {}, end time is {}, "
- + "time consumption is {}ms",
- getProcessorName(), insertFilePath,
- DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
- DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
- closeEndTime - closeStartTime);
- }
-
- }catch (IOException | ActionException e) {
- LOGGER.error("Close bufferwrite processor {} failed.", getProcessorName(), e);
- return false;
- } finally {
- isClosed = true;
- }
- return true;
- }
-
- @Override
- public long memoryUsage() {
- return memSize.get();
- }
-
- /**
- * get metadata size.
- *
- * @return The sum of all timeseries's metadata size within this file.
- */
- public long getMetaSize() {
- // TODO : [MemControl] implement this
- return 0;
- }
-
- /**
- * get file size.
- *
- * @return The file size of the TsFile corresponding to this processor.
- */
- public long getFileSize() {
- // TODO : save this variable to avoid object creation?
- File file = new File(insertFilePath);
- return file.length() + memoryUsage();
- }
-
- public String getBaseDir() {
- return baseDir;
- }
-
-
- public String getFileRelativePath() {
- return bufferWriteRelativePath;
- }
-
- private String getBufferwriteRestoreFilePath() {
- return writer.getRestoreFilePath();
- }
-
- public boolean isNewProcessor() {
- return writer.isNewResource();
- }
-
- public void setNewProcessor(boolean isNewProcessor) {
- writer.setNewResource(isNewProcessor);
- }
-
- public WriteLogNode getLogNode() throws IOException {
- if (logNode == null) {
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- logNode = MultiFileLogNodeManager.getInstance().getNode(
- logNodePrefix() + new File(insertFilePath).getName());
- }
- }
- return logNode;
- }
-
- public String logNodePrefix() {
- return logNodePrefix(processorName);
- }
-
- public static String logNodePrefix(String processorName) {
- return processorName + "-BufferWrite-";
- }
-
- /**
- * used for test. We can know when the flush() is called.
- * @return the last flush() time. Time unit: millisecond.
- */
- public long getLastFlushTime() {
- return lastFlushTime;
- }
-
- /**
- * used for test. We can block to wait for finishing flushing.
- * @return the future of the flush() task.
- */
- public Future<Boolean> getFlushFuture() {
- return flushFuture;
- }
-
- /**
- * Delete data whose timestamp <= 'timestamp' and belonging to timeseries deviceId.measurementId.
- * Delete data in both working MemTable and flushing MemTable.
- *
- * @param deviceId the deviceId of the timeseries to be deleted.
- * @param measurementId the measurementId of the timeseries to be deleted.
- * @param timestamp the upper-bound of deletion time.
- */
- public void delete(String deviceId, String measurementId, long timestamp)
- throws BufferWriteProcessorException {
- workMemTable.delete(deviceId, measurementId, timestamp);
- // flushing MemTable cannot be directly modified since another thread is reading it
- for (IMemTable memTable : flushingMemTables) {
- if (memTable.containSeries(deviceId, measurementId)) {
- memTable.delete(new Deletion(new Path(deviceId, measurementId), 0, timestamp));
- }
- }
- }
-
- @Override
- public boolean equals(Object o) {
- return this == o;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode());
- }
-
- @Override
- public String toString() {
- return "BufferWriteProcessor in " + insertFilePath;
- }
-
- public String getInsertFilePath() {
- return insertFilePath;
- }
-
- public boolean isClosed() {
- return isClosed;
- }
-
- public TsFileResource getCurrentTsFileResource() {
- return currentTsFileResource;
- }
- public void setCurrentTsFileResource(TsFileResource resource) {
- this.currentTsFileResource = resource;
- }
-
- public Future<Boolean> getCloseFuture() {
- return closeFuture;
- }
-
-
- class BWCloseFuture implements Future<Boolean> {
- Future<Boolean> future;
- public BWCloseFuture(Future<Boolean> closeFuture) {
- this.future = closeFuture;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return future.cancel(mayInterruptIfRunning);
- }
-
- @Override
- public boolean isCancelled() {
- return future.isCancelled();
- }
-
- @Override
- public boolean isDone() {
- return future.isDone();
- }
-
- @Override
- public Boolean get() throws InterruptedException, ExecutionException {
- return flushFuture.get() && future.get();
- }
-
- @Override
- public Boolean get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return flushFuture.get(timeout, unit) && future.get(timeout, unit);
- }
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/FileNodeConstants.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/FileNodeConstants.java
deleted file mode 100644
index 483f7f1..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/FileNodeConstants.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.bufferwrite;
-
-/**
- * Constants for using in bufferwrite, overflow and filenode.
- *
- */
-public class FileNodeConstants {
-
- private FileNodeConstants(){
- }
-
- public static final String FILE_NODE_OPERATOR_TYPE = "OPERATOR_TYPE";
- public static final String TIMESTAMP_KEY = "TIMESTAMP";
- public static final String FILE_NODE = "FILE_NODE";
- public static final String CLOSE_ACTION = "CLOSE_ACTION";
-
- public static final String OVERFLOW_FLUSH_ACTION = "OVERFLOW_FLUSH_ACTION";
- public static final String BUFFERWRITE_FLUSH_ACTION = "BUFFERWRITE_FLUSH_ACTION";
- public static final String BUFFERWRITE_CLOSE_ACTION = "BUFFERWRITE_CLOSE_ACTION";
- public static final String FILENODE_PROCESSOR_FLUSH_ACTION = "FILENODE_PROCESSOR_FLUSH_ACTION";
-
- public static final String MREGE_EXTENSION = "merge";
- public static final String ERR_EXTENSION = "err";
- public static final String PATH_SEPARATOR = ".";
- public static final String BUFFERWRITE_FILE_SEPARATOR = "-";
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
deleted file mode 100644
index d718c1b..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.bufferwrite;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.BytesUtils;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.writer.DefaultTsFileOutput;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A tsfile io writer that has the ability to restore an incomplete tsfile. <br/> An incomplete
- * tsfile represents the file which does not have tsfile metadata in the end. Besides, the last
- * Chunk group data may be broken. This class can slice off the broken Chunk group data, accept
- * writing new data, and finally insert the tsfile metadata. <br/> There are two cases: (1) though
- * the tsfile loses the tsfile metadata in the end, a corresponding. restore file exists. (2) no
- * .restore file, and then the class has to traverse all the data for fixing the file.
- */
-@Deprecated
-public class RestorableTsFileIOWriter extends TsFileIOWriter {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(RestorableTsFileIOWriter.class);
-
- private static final int TS_METADATA_BYTE_SIZE = 4;
- private static final int TS_POSITION_BYTE_SIZE = 8;
-
- public static final String RESTORE_SUFFIX = ".restore";
- private static final String DEFAULT_MODE = "rw";
-
- private int lastFlushedChunkGroupIndex = 0;
- /**
- * chunk group metadata which are not serialized on disk (.restore file).
- */
- private List<ChunkGroupMetaData> append;
-
- /**
- * all chunk group metadata which have been serialized on disk.
- */
- private Map<String, Map<String, List<ChunkMetaData>>> metadatas;
-
- /**
- * unsealed data file.
- */
- private String insertFilePath;
- /**
- * corresponding index file.
- */
- private String restoreFilePath;
-
- private boolean isNewResource = false;
-
- public RestorableTsFileIOWriter(String processorName, String insertFilePath) throws IOException {
- super();
- this.insertFilePath = insertFilePath;
- this.restoreFilePath = insertFilePath + RESTORE_SUFFIX;
-
- this.metadatas = new HashMap<>();
-
- File insertFile = new File(insertFilePath);
- File restoreFile = new File(restoreFilePath);
- if (insertFile.exists() && restoreFile.exists()) {
- // read restore file
- Pair<Long, List<ChunkGroupMetaData>> restoreInfo = readRestoreInfo();
- long position = restoreInfo.left;
- List<ChunkGroupMetaData> existedMetadatas = restoreInfo.right;
- // cut off tsfile
- this.out = new DefaultTsFileOutput(new FileOutputStream(insertFile, true));
- out.truncate(position);
- this.chunkGroupMetaDataList = existedMetadatas;
- lastFlushedChunkGroupIndex = chunkGroupMetaDataList.size();
- append = new ArrayList<>();
- // recovery the metadata
- recoverMetadata(existedMetadatas);
- LOGGER.info(
- "Recover the bufferwrite processor {}, the tsfile seriesPath is {}, "
- + "the position of last flushed ChunkGroup is {}, the size of rowGroupMetadata is {}",
- processorName, insertFilePath, position, existedMetadatas.size());
- isNewResource = false;
- } else {
- try {
- Files.deleteIfExists(insertFile.toPath());
- } catch (IOException e) {
- LOGGER.info("remove unsealed tsfile failed: ", e);
- }
- try {
- Files.deleteIfExists(restoreFile.toPath());
- } catch (IOException e) {
- LOGGER.info("remove unsealed tsfile restore file failed: ", e);
- }
- this.out = new DefaultTsFileOutput(new FileOutputStream(insertFile));
- this.chunkGroupMetaDataList = new ArrayList<>();
- lastFlushedChunkGroupIndex = chunkGroupMetaDataList.size();
- append = new ArrayList<>();
- startFile();
- isNewResource = true;
- writeRestoreInfo();
- }
-
- }
-
- private void recoverMetadata(List<ChunkGroupMetaData> rowGroupMetaDatas) {
- // TODO it is better if we can consider the problem caused by deletion
- // and re-create time series here.
- for (ChunkGroupMetaData rowGroupMetaData : rowGroupMetaDatas) {
- String deviceId = rowGroupMetaData.getDeviceID();
- if (!metadatas.containsKey(deviceId)) {
- metadatas.put(deviceId, new HashMap<>());
- }
- for (ChunkMetaData chunkMetaData : rowGroupMetaData.getChunkMetaDataList()) {
- String measurementId = chunkMetaData.getMeasurementUid();
- if (!metadatas.get(deviceId).containsKey(measurementId)) {
- metadatas.get(deviceId).put(measurementId, new ArrayList<>());
- }
- metadatas.get(deviceId).get(measurementId).add(chunkMetaData);
- }
- }
- }
-
- public void writeRestoreInfo() throws IOException {
- long start = System.currentTimeMillis();
- long lastPosition = this.getPos();
- // TODO: no need to create a TsRowGroupBlockMetadata, flush RowGroupMetadata one by one is ok
- TsDeviceMetadata tsDeviceMetadata = new TsDeviceMetadata();
- this.getAppendedRowGroupMetadata();
- tsDeviceMetadata.setChunkGroupMetadataList(this.append);
-
- try (RandomAccessFile out = new RandomAccessFile(restoreFilePath, DEFAULT_MODE)) {
- if (out.length() > 0) {
- out.seek(out.length() - TS_POSITION_BYTE_SIZE);
- }
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- tsDeviceMetadata.serializeTo(baos);
- // insert metadata size using int
- int metadataSize = baos.size();
- out.write(BytesUtils.intToBytes(metadataSize));
- // insert metadata
- out.write(baos.toByteArray());
- // insert tsfile position using byte[8] which is a long
- byte[] lastPositionBytes = BytesUtils.longToBytes(lastPosition);
- out.write(lastPositionBytes);
- }
- LOGGER.info("writeRestoreInfo() cost {} ms", System.currentTimeMillis() - start);
- }
-
- /**
- * This is a private method. (It is default now for Unit Test only)
- *
- * @return a pair, whose left Long value is the tail position of the last complete Chunk Group in
- * the unsealed file's position, and the right List value is the ChunkGroupMetadata of all
- * complete Chunk Group in the same file.
- * @throws IOException if errors when reading restoreFile.
- */
- Pair<Long, List<ChunkGroupMetaData>> readRestoreInfo() throws IOException {
- byte[] lastPostionBytes = new byte[TS_POSITION_BYTE_SIZE];
- List<ChunkGroupMetaData> groupMetaDatas = new ArrayList<>();
- RandomAccessFile randomAccessFile = null;
- randomAccessFile = new RandomAccessFile(restoreFilePath, DEFAULT_MODE);
- try {
- long fileLength = randomAccessFile.length();
- // read tsfile position
- long point = randomAccessFile.getFilePointer();
- while (point + TS_POSITION_BYTE_SIZE < fileLength) {
- byte[] metadataSizeBytes = new byte[TS_METADATA_BYTE_SIZE];
- randomAccessFile.read(metadataSizeBytes);
- int metadataSize = BytesUtils.bytesToInt(metadataSizeBytes);
- byte[] thriftBytes = new byte[metadataSize];
- randomAccessFile.read(thriftBytes);
- ByteArrayInputStream inputStream = new ByteArrayInputStream(thriftBytes);
- TsDeviceMetadata tsDeviceMetadata = TsDeviceMetadata.deserializeFrom(inputStream);
- groupMetaDatas.addAll(tsDeviceMetadata.getChunkGroupMetaDataList());
- point = randomAccessFile.getFilePointer();
- }
- // read the tsfile position information using byte[8] which is a long.
- randomAccessFile.read(lastPostionBytes);
- long lastPosition = BytesUtils.bytesToLong(lastPostionBytes);
- return new Pair<>(lastPosition, groupMetaDatas);
- } finally {
- randomAccessFile.close();
- }
- }
-
- /**
- * get chunks' metadata from memory.
- *
- * @param deviceId the device id
- * @param measurementId the sensor id
- * @param dataType the value type
- * @return chunks' metadata
- */
- public List<ChunkMetaData> getMetadatas(String deviceId, String measurementId, TSDataType dataType) {
- List<ChunkMetaData> chunkMetaDatas = new ArrayList<>();
- if (metadatas.containsKey(deviceId) && metadatas.get(deviceId).containsKey(measurementId)) {
- for (ChunkMetaData chunkMetaData : metadatas.get(deviceId).get(measurementId)) {
- // filter: if a device'sensor is defined as float type, and data has been persistent.
- // Then someone deletes the timeseries and recreate it with Int type. We have to ignore
- // all the stale data.
- if (dataType.equals(chunkMetaData.getTsDataType())) {
- chunkMetaDatas.add(chunkMetaData);
- }
- }
- }
- return chunkMetaDatas;
- }
-
- String getInsertFilePath() {
- return insertFilePath;
- }
-
- public String getRestoreFilePath() {
- return restoreFilePath;
- }
-
- boolean isNewResource() {
- return isNewResource;
- }
-
- void setNewResource(boolean isNewResource) {
- this.isNewResource = isNewResource;
- }
-
- /**
- * add all appendChunkGroupMetadatas into memory. After calling this method, other classes can
- * read these metadata.
- */
- public void makeMetadataVisible() {
- if (!append.isEmpty()) {
- for (ChunkGroupMetaData rowGroupMetaData : append) {
- for (ChunkMetaData chunkMetaData : rowGroupMetaData.getChunkMetaDataList()) {
- addChunkMetadata(rowGroupMetaData.getDeviceID(), chunkMetaData.getMeasurementUid(),
- chunkMetaData);
- }
- }
- append.clear();
- }
- }
-
- private void addChunkMetadata(String deviceId, String measurementId,
- ChunkMetaData chunkMetaData) {
- if (!metadatas.containsKey(deviceId)) {
- metadatas.put(deviceId, new HashMap<>());
- }
- if (!metadatas.get(deviceId).containsKey(measurementId)) {
- metadatas.get(deviceId).put(measurementId, new ArrayList<>());
- }
- metadatas.get(deviceId).get(measurementId).add(chunkMetaData);
- }
-
- @Override
- public void endFile(FileSchema schema) throws IOException {
- super.endFile(schema);
- try {
- Files.delete(Paths.get(restoreFilePath));
- } catch (IOException e) {
- LOGGER.info("delete restore file {} failed, because {}", restoreFilePath, e.getMessage());
- }
- }
-
- /**
- * get all the chunkGroups' metadata which are appended after the last calling of this method, or
- * after the class instance is initialized if this is the first time to call the method.
- *
- * @return a list of chunkgroup metadata
- */
- private List<ChunkGroupMetaData> getAppendedRowGroupMetadata() {
- if (lastFlushedChunkGroupIndex < chunkGroupMetaDataList.size()) {
- append.clear();
- append.addAll(chunkGroupMetaDataList
- .subList(lastFlushedChunkGroupIndex, chunkGroupMetaDataList.size()));
- lastFlushedChunkGroupIndex = chunkGroupMetaDataList.size();
- }
- return append;
- }
-
- /**
- * see {@link java.nio.channels.FileChannel#truncate(long)}.
- */
- public void truncate(long position) throws IOException {
- out.truncate(position);
- }
-
- /**
- * just for test.
- *
- * @return the output
- */
- TsFileOutput getOutput() {
- return out;
- }
-
- public static int getTsPositionByteSize() {
- return TS_POSITION_BYTE_SIZE;
- }
-}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index d23ff73..2d1144e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -2021,7 +2021,7 @@
// closingBufferWriteProcessor.remove(processor);
// }
// }
-// closingBufferWriteProcessor.reset();
+// closingBufferWriteProcessor.clear();
// return closingBufferWriteProcessor;
// }
//
@@ -2125,6 +2125,6 @@
// //do nothing.
// }
// }
-// this.getClosingBufferWriteProcessor().reset();
+// this.getClosingBufferWriteProcessor().clear();
// }
//}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java
deleted file mode 100644
index 7c1eca4..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenode;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-/**
- * FileNodeProcessorStore is used to store information about FileNodeProcessor's status.
- * lastUpdateTime is changed and stored by BufferWrite flush or BufferWrite close.
- * emptyTsFileResource and newFileNodes are changed and stored by Overflow flush and
- * Overflow close. fileNodeProcessorState is changed and stored by the change of FileNodeProcessor's
- * status such as "work->merge merge->wait wait->work". numOfMergeFile is changed
- * and stored when FileNodeProcessor's status changes from work to merge.
- */
-public class FileNodeProcessorStore implements Serializable {
-
- private static final long serialVersionUID = -54525372941897565L;
-
- private boolean isOverflowed;
- private Map<String, Long> lastUpdateTimeMap;
- private TsFileResource emptyTsFileResource;
- private List<TsFileResource> newFileNodes;
- private int numOfMergeFile;
- private FileNodeProcessorStatus fileNodeProcessorStatus;
-
- /**
- * Constructor of FileNodeProcessorStore.
- *
- * @param isOverflowed whether this FileNode contains unmerged Overflow operations.
- * @param lastUpdateTimeMap the timestamp of last data point of each device in this FileNode.
- * @param emptyTsFileResource a place holder when the FileNode contains no TsFile.
- * @param newFileNodes TsFiles in the FileNode.
- * @param fileNodeProcessorStatus the status of the FileNode.
- * @param numOfMergeFile the number of files already merged in one merge operation.
- */
- public FileNodeProcessorStore(boolean isOverflowed, Map<String, Long> lastUpdateTimeMap,
- TsFileResource emptyTsFileResource,
- List<TsFileResource> newFileNodes,
- FileNodeProcessorStatus fileNodeProcessorStatus,
- int numOfMergeFile) {
- this.isOverflowed = isOverflowed;
- this.lastUpdateTimeMap = lastUpdateTimeMap;
- this.emptyTsFileResource = emptyTsFileResource;
- this.newFileNodes = newFileNodes;
- this.fileNodeProcessorStatus = fileNodeProcessorStatus;
- this.numOfMergeFile = numOfMergeFile;
- }
-
- public void serialize(OutputStream outputStream) throws IOException {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- ReadWriteIOUtils.write(this.isOverflowed, byteArrayOutputStream);
- // lastUpdateTimeMap
- ReadWriteIOUtils.write(lastUpdateTimeMap.size(), byteArrayOutputStream);
- for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
- ReadWriteIOUtils.write(entry.getKey(), byteArrayOutputStream);
- ReadWriteIOUtils.write(entry.getValue(), byteArrayOutputStream);
- }
- this.emptyTsFileResource.serialize(byteArrayOutputStream);
- ReadWriteIOUtils.write(this.newFileNodes.size(), byteArrayOutputStream);
- for (TsFileResource tsFileResource : this.newFileNodes) {
- tsFileResource.serialize(byteArrayOutputStream);
- }
- ReadWriteIOUtils.write(this.numOfMergeFile, byteArrayOutputStream);
- ReadWriteIOUtils.write(this.fileNodeProcessorStatus.serialize(), byteArrayOutputStream);
- // buffer array to outputstream
- byteArrayOutputStream.writeTo(outputStream);
- }
-
- public static FileNodeProcessorStore deSerialize(InputStream inputStream) throws IOException {
- boolean isOverflowed = ReadWriteIOUtils.readBool(inputStream);
- Map<String, Long> lastUpdateTimeMap = new HashMap<>();
- int size = ReadWriteIOUtils.readInt(inputStream);
- for (int i = 0; i < size; i++) {
- String path = ReadWriteIOUtils.readString(inputStream);
- long time = ReadWriteIOUtils.readLong(inputStream);
- lastUpdateTimeMap.put(path, time);
- }
- TsFileResource emptyTsFileResource = TsFileResource.deSerialize(inputStream);
- size = ReadWriteIOUtils.readInt(inputStream);
- List<TsFileResource> newFileNodes = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- newFileNodes.add(TsFileResource.deSerialize(inputStream));
- }
- int numOfMergeFile = ReadWriteIOUtils.readInt(inputStream);
- FileNodeProcessorStatus fileNodeProcessorStatus = FileNodeProcessorStatus
- .deserialize(ReadWriteIOUtils.readShort(inputStream));
-
- return new FileNodeProcessorStore(isOverflowed, lastUpdateTimeMap, emptyTsFileResource,
- newFileNodes, fileNodeProcessorStatus, numOfMergeFile);
- }
-
- public boolean isOverflowed() {
- return isOverflowed;
- }
-
- public void setOverflowed(boolean isOverflowed) {
- this.isOverflowed = isOverflowed;
- }
-
- public FileNodeProcessorStatus getFileNodeProcessorStatus() {
- return fileNodeProcessorStatus;
- }
-
- public void setFileNodeProcessorStatus(FileNodeProcessorStatus fileNodeProcessorStatus) {
- this.fileNodeProcessorStatus = fileNodeProcessorStatus;
- }
-
- public Map<String, Long> getLastUpdateTimeMap() {
- return new HashMap<>(lastUpdateTimeMap);
- }
-
- public void setLastUpdateTimeMap(Map<String, Long> lastUpdateTimeMap) {
- this.lastUpdateTimeMap = lastUpdateTimeMap;
- }
-
- public TsFileResource getEmptyTsFileResource() {
- return emptyTsFileResource;
- }
-
- public void setEmptyTsFileResource(TsFileResource emptyTsFileResource) {
- this.emptyTsFileResource = emptyTsFileResource;
- }
-
- public List<TsFileResource> getNewFileNodes() {
- return newFileNodes;
- }
-
- public void setNewFileNodes(List<TsFileResource> newFileNodes) {
- this.newFileNodes = newFileNodes;
- }
-
- public int getNumOfMergeFile() {
- return numOfMergeFile;
- }
-
- public void setNumOfMergeFile(int numOfMergeFile) {
- this.numOfMergeFile = numOfMergeFile;
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/OverflowChangeType.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/OverflowChangeType.java
deleted file mode 100644
index 48c8eee..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/OverflowChangeType.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenode;
-
-/**
- * if a file is not changed by overflow, it's in NO_CHANGE;<br>
- * if it's changed and in NO_CHANGE previously, NO_CHANGE-->CHANGED, update file<br>
- * If it's changed and in CHANGED previously, and in merging, CHANGED-->MERGING_CHANGE, update file<br>
- * If it's changed and in CHANGED previously, and not in merging, do nothing<br>
- * After merging, if it's MERGING_CHANGE, MERGING_CHANGE-->CHANGED, otherwise in NO_CHANGE, MERGING_CHANGE-->NO_CHANGE
- */
-public enum OverflowChangeType {
- NO_CHANGE, CHANGED, MERGING_CHANGE;
-
- public short serialize() {
- switch (this) {
- case NO_CHANGE:
- return 0;
- case CHANGED:
- return 1;
- case MERGING_CHANGE:
- return 2;
- default:
- throw new IllegalStateException("Unsupported type");
- }
- }
-
- public static OverflowChangeType deserialize(short i) {
- switch (i) {
- case 0:
- return NO_CHANGE;
- case 1:
- return CHANGED;
- case 2:
- return MERGING_CHANGE;
- default:
- throw new IllegalArgumentException(
- String.format("Invalid input %d for OverflowChangeType", i));
- }
- }
-}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
deleted file mode 100644
index 57281c8..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenode;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Set;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
-import org.apache.iotdb.db.engine.bufferwrite.RestorableTsFileIOWriter;
-import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-/**
- * This class is used to store one bufferwrite file status.<br>
- */
-public class TsFileResource {
-
- private OverflowChangeType overflowChangeType;
-
- //the file index of `settled` folder in the DirectoryManager.
- private int baseDirIndex;
- private File file;
- private Map<String, Long> startTimeMap;
- private Map<String, Long> endTimeMap;
- private Set<String> mergeChanged = new HashSet<>();
-
- private transient ModificationFile modFile;
-
- /**
- * @param autoRead whether read the file to initialize startTimeMap and endTimeMap
- */
- public TsFileResource(File file, boolean autoRead) throws IOException {
- this(new HashMap<>(), new HashMap<>(), OverflowChangeType.NO_CHANGE, file);
- if (autoRead) {
- //init startTime and endTime
- try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath())) {
- if (reader.readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
- //this is a complete tsfile, and we can read the metadata directly.
- for (Map.Entry<String, TsDeviceMetadataIndex> deviceEntry : reader.readFileMetadata()
- .getDeviceMap().entrySet()) {
- startTimeMap.put(deviceEntry.getKey(), deviceEntry.getValue().getStartTime());
- endTimeMap.put(deviceEntry.getKey(), deviceEntry.getValue().getEndTime());
- }
- } else {
- //sadly, this is not a complete tsfile. we have to repair it bytes by bytes
- //TODO will implement it
- List<ChunkGroupMetaData> metaDataList = new ArrayList<>();
- reader.selfCheck(null, metaDataList, false);
- initTimeMapFromChunGroupMetaDatas(metaDataList);
- }
- }
- }
- }
-
- /**
- * @param writer an unclosed TsFile Writer
- */
- public TsFileResource(File file, RestorableTsFileIOWriter writer) {
- this(new HashMap<>(), new HashMap<>(), OverflowChangeType.NO_CHANGE, file);
- initTimeMapFromChunGroupMetaDatas(writer.getChunkGroupMetaDatas());
- }
-
- private void initTimeMapFromChunGroupMetaDatas(List<ChunkGroupMetaData> metaDataList) {
- for (ChunkGroupMetaData metaData : metaDataList) {
- long startTime = startTimeMap.getOrDefault(metaData.getDeviceID(), Long.MAX_VALUE);
- long endTime = endTimeMap.getOrDefault(metaData.getDeviceID(), Long.MIN_VALUE);
- for (ChunkMetaData chunk : metaData.getChunkMetaDataList()) {
- if (chunk.getStartTime() < startTime) {
- startTime = chunk.getStartTime();
- }
- if (chunk.getEndTime() > endTime) {
- endTime = chunk.getEndTime();
- }
- }
- startTimeMap.put(metaData.getDeviceID(), startTime);
- endTimeMap.put(metaData.getDeviceID(), endTime);
- }
- }
-
-
- public TsFileResource(Map<String, Long> startTimeMap, Map<String, Long> endTimeMap,
- OverflowChangeType type, File file) {
-
- this.overflowChangeType = type;
- if (file != null) {
- this.baseDirIndex = DirectoryManager.getInstance()
- .getTsFileFolderIndex(file.getParentFile().getParent());
- this.modFile = new ModificationFile(file.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
- }
- this.file = file;
-
- this.startTimeMap = startTimeMap;
- this.endTimeMap = endTimeMap;
-
- }
-
- public void serialize(OutputStream outputStream) throws IOException {
- ReadWriteIOUtils.write(this.overflowChangeType.serialize(), outputStream);
- ReadWriteIOUtils.write(this.baseDirIndex, outputStream);
- ReadWriteIOUtils.writeIsNull(this.file, outputStream);
- if (this.file != null) {
- ReadWriteIOUtils.write(getRelativePath(), outputStream);
- }
- ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream);
- for (Entry<String, Long> entry : this.startTimeMap.entrySet()) {
- ReadWriteIOUtils.write(entry.getKey(), outputStream);
- ReadWriteIOUtils.write(entry.getValue(), outputStream);
- }
- ReadWriteIOUtils.write(this.endTimeMap.size(), outputStream);
- for (Entry<String, Long> entry : this.endTimeMap.entrySet()) {
- ReadWriteIOUtils.write(entry.getKey(), outputStream);
- ReadWriteIOUtils.write(entry.getValue(), outputStream);
- }
- ReadWriteIOUtils.write(mergeChanged.size(), outputStream);
- for (String mergeChangedElement : this.mergeChanged) {
- ReadWriteIOUtils.write(mergeChangedElement, outputStream);
- }
- }
-
- public static TsFileResource deSerialize(InputStream inputStream) throws IOException {
- OverflowChangeType overflowChangeType = OverflowChangeType
- .deserialize(ReadWriteIOUtils.readShort(inputStream));
- int baseDirIndex = ReadWriteIOUtils.readInt(inputStream);
- boolean hasRelativePath = ReadWriteIOUtils.readIsNull(inputStream);
-
- File file = null;
- if (hasRelativePath) {
- String relativePath = ReadWriteIOUtils.readString(inputStream);
- file = new File(DirectoryManager.getInstance().getTsFileFolder(baseDirIndex), relativePath);
- }
- int size = ReadWriteIOUtils.readInt(inputStream);
- Map<String, Long> startTimes = new HashMap<>();
- for (int i = 0; i < size; i++) {
- String path = ReadWriteIOUtils.readString(inputStream);
- long time = ReadWriteIOUtils.readLong(inputStream);
- startTimes.put(path, time);
- }
- size = ReadWriteIOUtils.readInt(inputStream);
- Map<String, Long> endTimes = new HashMap<>();
- for (int i = 0; i < size; i++) {
- String path = ReadWriteIOUtils.readString(inputStream);
- long time = ReadWriteIOUtils.readLong(inputStream);
- endTimes.put(path, time);
- }
- size = ReadWriteIOUtils.readInt(inputStream);
- Set<String> mergeChanaged = new HashSet<>();
- for (int i = 0; i < size; i++) {
- String path = ReadWriteIOUtils.readString(inputStream);
- mergeChanaged.add(path);
- }
- TsFileResource tsFileResource = new TsFileResource(startTimes, endTimes, overflowChangeType, file);
- tsFileResource.mergeChanged = mergeChanaged;
- return tsFileResource;
- }
-
-
- public void setStartTime(String deviceId, long startTime) {
-
- startTimeMap.put(deviceId, startTime);
- }
-
- public long getStartTime(String deviceId) {
-
- if (startTimeMap.containsKey(deviceId)) {
- return startTimeMap.get(deviceId);
- } else {
- return -1;
- }
- }
-
- public void removeStartTime(String deviceId) {
- startTimeMap.remove(deviceId);
- }
-
- public Map<String, Long> getStartTimeMap() {
-
- return startTimeMap;
- }
-
- public void setStartTimeMap(Map<String, Long> startTimeMap) {
-
- this.startTimeMap = startTimeMap;
- }
-
- public void setEndTime(String deviceId, long timestamp) {
-
- this.endTimeMap.put(deviceId, timestamp);
- }
-
- public long getEndTime(String deviceId) {
-
- if (endTimeMap.get(deviceId) == null) {
- return -1;
- }
- return endTimeMap.get(deviceId);
- }
-
- public Map<String, Long> getEndTimeMap() {
-
- return endTimeMap;
- }
-
- public void setEndTimeMap(Map<String, Long> endTimeMap) {
-
- this.endTimeMap = endTimeMap;
- }
-
- public void removeTime(String deviceId) {
-
- startTimeMap.remove(deviceId);
- endTimeMap.remove(deviceId);
- }
-
-
- public File getFile() {
- return file;
- }
-
-
- public int getBaseDirIndex() {
- return baseDirIndex;
- }
-
- public boolean checkEmpty() {
-
- return startTimeMap.isEmpty() && endTimeMap.isEmpty();
- }
-
- public void clear() {
- startTimeMap.clear();
- endTimeMap.clear();
- mergeChanged.clear();
- overflowChangeType = OverflowChangeType.NO_CHANGE;
- }
-
- public void changeTypeToChanged(FileNodeProcessorStatus fileNodeProcessorState) {
-
- if (fileNodeProcessorState == FileNodeProcessorStatus.MERGING_WRITE) {
- overflowChangeType = OverflowChangeType.MERGING_CHANGE;
- } else {
- overflowChangeType = OverflowChangeType.CHANGED;
- }
- }
-
- public void addMergeChanged(String deviceId) {
-
- mergeChanged.add(deviceId);
- }
-
- public Set<String> getMergeChanged() {
-
- return mergeChanged;
- }
-
- public void clearMergeChanged() {
-
- mergeChanged.clear();
- }
-
- public boolean isClosed() {
-
- return !endTimeMap.isEmpty();
-
- }
-
- public TsFileResource backUp() {
-
- Map<String, Long> startTimeMapCopy = new HashMap<>(this.startTimeMap);
- Map<String, Long> endTimeMapCopy = new HashMap<>(this.endTimeMap);
- return new TsFileResource(startTimeMapCopy,
- endTimeMapCopy, overflowChangeType, file);
- }
-
- public Set<String> getDevices() {
- return this.startTimeMap.keySet();
- }
-
- @Override
- public int hashCode() {
-
- final int prime = 31;
- int result = 1;
- result = prime * result + ((endTimeMap == null) ? 0 : endTimeMap.hashCode());
- result = prime * result + ((file == null) ? 0 : file.hashCode());
- result = prime * result + ((overflowChangeType == null) ? 0 : overflowChangeType.hashCode());
- result = prime * result + ((startTimeMap == null) ? 0 : startTimeMap.hashCode());
- return result;
- }
-
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof TsFileResource)) {
- return false;
- }
- TsFileResource that = (TsFileResource) o;
- return baseDirIndex == that.baseDirIndex &&
- overflowChangeType == that.overflowChangeType &&
- Objects.equals(file, that.file) &&
- Objects.equals(startTimeMap, that.startTimeMap) &&
- Objects.equals(endTimeMap, that.endTimeMap) &&
- Objects.equals(mergeChanged, that.mergeChanged) &&
- Objects.equals(modFile, that.modFile);
- }
-
-
- public OverflowChangeType getOverflowChangeType() {
- return overflowChangeType;
- }
-
- public void setOverflowChangeType(OverflowChangeType overflowChangeType) {
- this.overflowChangeType = overflowChangeType;
- }
-
- public synchronized ModificationFile getModFile() {
- if (modFile == null) {
- modFile = new ModificationFile(file.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
- }
- return modFile;
- }
-
- public boolean containsDevice(String deviceId) {
- return startTimeMap.containsKey(deviceId);
- }
-
- public void setModFile(ModificationFile modFile) {
- this.modFile = modFile;
- }
-
- public void close() throws IOException {
- modFile.close();
- }
-
- public String getRelativePath() {
- if (file == null) {
- return null;
- }
- return this.getFile().getParentFile().getName() + File.separator + this.getFile().getName();
- }
-
- public void setFile(File file) throws IOException {
- this.file = file;
- if (file != null) {
- this.baseDirIndex = DirectoryManager.getInstance()
- .getTsFileFolderIndex(file.getParentFile().getParent());
- if (this.modFile != null) {
- this.modFile.close();
- }
- this.modFile = new ModificationFile(file.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
- }
- }
-
- public String getFilePath() {
- return this.getFile().getAbsolutePath();
- }
-
- public void updateTime(String deviceId, long time) {
- startTimeMap.putIfAbsent(deviceId, time);
- Long endTime = endTimeMap.get(deviceId);
- if (endTime == null || endTime < time) {
- endTimeMap.put(deviceId, time);
- }
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index 81e6cb7..505fd83 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -29,16 +29,12 @@ import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
-import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.exception.FileNodeProcessorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.service.IService;
@@ -265,7 +261,7 @@ public class FileNodeManagerV2 implements IService {
* @param fileNodeName the seriesPath of storage group
* @param appendFile the appended tsfile information
*/
- public boolean appendFileToFileNode(String fileNodeName, TsFileResource appendFile,
+ public boolean appendFileToFileNode(String fileNodeName, TsFileResourceV2 appendFile,
String appendFilePath) throws FileNodeManagerException {
// TODO
return true;
@@ -277,7 +273,7 @@ public class FileNodeManagerV2 implements IService {
* @param fileNodeName the seriesPath of storage group
* @param appendFile the appended tsfile information
*/
- public List<String> getOverlapFilesFromFileNode(String fileNodeName, TsFileResource appendFile,
+ public List<String> getOverlapFilesFromFileNode(String fileNodeName, TsFileResourceV2 appendFile,
String uuid) throws FileNodeManagerException {
// TODO
return null;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index 9d77040..1ca080f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -84,6 +84,15 @@ public class TsFileResourceV2 {
public TsFileResourceV2(File file,
Map<String, Long> startTimeMap,
+ Map<String, Long> endTimeMap) {
+ this.file = file;
+ this.startTimeMap = startTimeMap;
+ this.endTimeMap = endTimeMap;
+ this.closed = true;
+ }
+
+ public TsFileResourceV2(File file,
+ Map<String, Long> startTimeMap,
Map<String, Long> endTimeMap,
ReadOnlyMemChunk readOnlyMemChunk,
List<ChunkMetaData> chunkMetaDatas) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 7efe110..d0dca77 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -444,11 +444,15 @@ public class UnsealedTsFileProcessorV2 {
continue;
}
ReadOnlyMemChunk memChunk = flushingMemTable.query(deviceId, measurementId, dataType, props);
- memSeriesLazyMerger.addMemSeries(memChunk);
+ if (memChunk != null) {
+ memSeriesLazyMerger.addMemSeries(memChunk);
+ }
}
if (workMemTable != null) {
ReadOnlyMemChunk memChunk = workMemTable.query(deviceId, measurementId, dataType, props);
- memSeriesLazyMerger.addMemSeries(memChunk);
+ if (memChunk != null) {
+ memSeriesLazyMerger.addMemSeries(memChunk);
+ }
}
// memSeriesLazyMerger has handled the props,
// so we do not need to handle it again in the following readOnlyMemChunk
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 73ad538..f753c20 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -140,7 +140,7 @@ public abstract class AbstractMemTable implements IMemTable {
Map<String, String> props) {
TimeValuePairSorter sorter;
if (!checkPath(deviceId, measurement)) {
- sorter = new WritableMemChunk(dataType);
+ return null;
} else {
long undeletedTime = findUndeletedTime(deviceId, measurement);
IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/DeduplicatedSortedData.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/DeduplicatedSortedData.java
deleted file mode 100644
index 5f67970..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/DeduplicatedSortedData.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.memtable;
-
-import java.util.List;
-import org.apache.iotdb.db.utils.TimeValuePair;
-
-public class DeduplicatedSortedData {
- private List<TimeValuePair> timeValuePairs;
-
- private int index;
-
- private int length;
-
- private TimeValuePair cachedTimeValuePair;
-
- private boolean hasCached;
-
- public DeduplicatedSortedData(List<TimeValuePair> timeValuePairs) {
- this.timeValuePairs = timeValuePairs;
- this.timeValuePairs.sort(TimeValuePair::compareTo);
- this.index = 0;
- this.length = timeValuePairs.size();
- }
-
- public boolean hasNext(){
- if(!hasCached) {
- cachedTimeValuePair = null;
- while (index < length) {
- if (cachedTimeValuePair == null || cachedTimeValuePair.getTimestamp() == timeValuePairs
- .get(index).getTimestamp()) {
- cachedTimeValuePair = timeValuePairs.get(index++);
- hasCached = true;
- } else {
- break;
- }
- }
- }
- return hasCached;
- }
-
- public TimeValuePair next(){
- if(!hasCached){
- hasNext();
- }
- hasCached = false;
- return cachedTimeValuePair;
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 3296ab2..0d4c280 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -54,13 +54,6 @@ public interface IWritableMemChunk extends TimeValuePairSorter {
void releasePrimitiveArrayList();
/**
- * be used when flushing data on disk.
- * this method will remove duplicated data and sort them.
- * @return
- */
- default DeduplicatedSortedData getDeduplicatedSortedData(){return null;}
-
- /**
* served for query requests.
* @return
*/
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushCallBack.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushCallBack.java
deleted file mode 100644
index ac24a07..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushCallBack.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.memtable;
-
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-
-@FunctionalInterface
-public interface MemTableFlushCallBack {
-
- void afterFlush(IMemTable memTable, TsFileIOWriter tsFileIOWriter);
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
deleted file mode 100644
index a2999b3..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with the License. You may obtain
- * a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.memtable;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.iotdb.db.engine.bufferwrite.RestorableTsFileIOWriter;
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
-import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
-import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MemTableFlushTask {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
- private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
-
- private TsFileIOWriter tsFileIoWriter;
-
- private ConcurrentLinkedQueue ioTaskQueue = new ConcurrentLinkedQueue();
- private ConcurrentLinkedQueue memoryTaskQueue = new ConcurrentLinkedQueue();
- private boolean stop = false;
- private String processorName;
- private long flushId;
-
- private MemTableFlushCallBack flushCallBack;
- private IMemTable memTable;
-
- public MemTableFlushTask(TsFileIOWriter writer, String processorName, long flushId,
- MemTableFlushCallBack callBack) {
- this.tsFileIoWriter = writer;
- this.processorName = processorName;
- this.flushId = flushId;
- this.flushCallBack = callBack;
- ioFlushThread.start();
- memoryFlushThread.start();
- LOGGER.info("Processor {} flush task created, flushId: {}", processorName, flushId);
- }
-
-
- private Thread memoryFlushThread = new Thread(() -> {
- long memSerializeTime = 0;
- LOGGER.info(
- "BufferWrite Processor {},start serialize data into mem.", processorName);
- while (!stop) {
- Object task = memoryTaskQueue.poll();
- if (task == null) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOGGER.error("BufferWrite Processor {}, io flush task is interrupted.", processorName, e);
- }
- } else {
- if (task instanceof String) {
- ioTaskQueue.add(task);
- } else if (task instanceof ChunkGroupIoTask) {
- ioTaskQueue.add(task);
- } else {
- long starTime = System.currentTimeMillis();
- Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
- ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
- IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
- PAGE_SIZE_THRESHOLD);
- try {
- writeOneSeries(memorySerializeTask.left, seriesWriter,
- memorySerializeTask.right.getType());
- ioTaskQueue.add(seriesWriter);
- } catch (IOException e) {
- LOGGER.error("BufferWrite Processor {}, io error.", processorName, e);
- throw new RuntimeException(e);
- }
- memSerializeTime += System.currentTimeMillis() - starTime;
- }
- }
- }
- LOGGER.info(
- "BufferWrite Processor {},flushing a memtable into disk: serialize data into mem cost {} ms.",
- processorName, memSerializeTime);
- });
-
-
- //TODO more better way is: for each TsFile, assign it a Executors.singleThreadPool,
- // rather than per each memtable.
- private Thread ioFlushThread = new Thread(() -> {
- long ioTime = 0;
- long lastWaitIdx = 0;
- long currentTsFileFlushId;
- LOGGER.info("BufferWrite Processor {}, start io cost.", processorName);
- long waitStartTime = System.currentTimeMillis();
-// while ((currentTsFileFlushId = tsFileIoWriter.getFlushID().get()) != flushId) {
-// try {
-// long waitedTime = System.currentTimeMillis() - waitStartTime;
-// long currWaitIdx = waitedTime / 2000;
-// if (currWaitIdx > lastWaitIdx) {
-// lastWaitIdx = currWaitIdx;
-// LOGGER.info("tsFileIoWriter flushID: {}, flush task flushID: {} has waited {}ms", currentTsFileFlushId,
-// flushId, waitedTime);
-// }
-// Thread.sleep(10);
-// } catch (InterruptedException e) {
-// LOGGER.error("Processor {}, last flush io task is not finished.", processorName, e);
-// }
-// }
- while (!stop) {
- Object seriesWriterOrEndChunkGroupTask = ioTaskQueue.poll();
- if (seriesWriterOrEndChunkGroupTask == null) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOGGER.error("BufferWrite Processor {}, io flush task is interrupted.", processorName, e);
- }
- } else {
- long starTime = System.currentTimeMillis();
- try {
- if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
- ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
- } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
- tsFileIoWriter.startChunkGroup((String) seriesWriterOrEndChunkGroupTask);
- } else {
- ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
- tsFileIoWriter.endChunkGroup(task.version);
- task.finished = true;
- }
- } catch (IOException e) {
- LOGGER.error("BufferWrite Processor {}, io error.", processorName, e);
- throw new RuntimeException(e);
- }
- ioTime += System.currentTimeMillis() - starTime;
- }
- }
-
- MemTablePool.getInstance().putBack(memTable);
- LOGGER.info("Processor {} return back a memtable to MemTablePool", processorName);
- if (tsFileIoWriter instanceof RestorableTsFileIOWriter) {
- try {
- ((RestorableTsFileIOWriter) tsFileIoWriter).writeRestoreInfo();
- } catch (IOException e) {
- LOGGER.error("insert restore file meet error", e);
- }
- }
- flushCallBack.afterFlush(memTable, tsFileIoWriter);
-
-
- // enable next flush task to IO
-// long newId = tsFileIoWriter.getFlushID().incrementAndGet();
-// LOGGER.info("BufferWrite Processor {}, flushing a memtable into disk: io cost {}ms, new flushID in tsFileIoWriter: {}.",
-// processorName, ioTime, newId);
- });
-
-
- private void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
- TSDataType dataType)
- throws IOException {
- for (TimeValuePair timeValuePair : tvPairs) {
- switch (dataType) {
- case BOOLEAN:
- seriesWriterImpl
- .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
- break;
- case INT32:
- seriesWriterImpl.write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getInt());
- break;
- case INT64:
- seriesWriterImpl.write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getLong());
- break;
- case FLOAT:
- seriesWriterImpl.write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getFloat());
- break;
- case DOUBLE:
- seriesWriterImpl
- .write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getDouble());
- break;
- case TEXT:
- seriesWriterImpl
- .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
- break;
- default:
- LOGGER.error("BufferWrite Processor {}, don't support data type: {}", processorName,
- dataType);
- break;
- }
- }
- }
-
- /**
- * the function for flushing memtable.
- */
- public void flushMemTable(FileSchema fileSchema, IMemTable imemTable, long version) {
- long sortTime = 0;
- ChunkGroupIoTask theLastTask = EMPTY_TASK;
- this.memTable = imemTable;
- for (String deviceId : imemTable.getMemTableMap().keySet()) {
- memoryTaskQueue.add(deviceId);
- int seriesNumber = imemTable.getMemTableMap().get(deviceId).size();
- for (String measurementId : imemTable.getMemTableMap().get(deviceId).keySet()) {
- long startTime = System.currentTimeMillis();
- // TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
- IWritableMemChunk series = imemTable.getMemTableMap().get(deviceId).get(measurementId);
- MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
- List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
- sortTime += System.currentTimeMillis() - startTime;
- memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
- }
- theLastTask = new ChunkGroupIoTask(seriesNumber, deviceId, version);
- memoryTaskQueue.add(theLastTask);
- }
- LOGGER.info(
- "BufferWrite Processor {}, flushing a memtable into disk: data sort time cost {} ms.",
- processorName, sortTime);
- while (!theLastTask.finished) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOGGER.error("BufferWrite Processor {}, flush memtable table thread is interrupted.",
- processorName, e);
- throw new RuntimeException(e);
- }
- }
- stop = true;
- while (ioFlushThread.isAlive()) {
- // wait until the after works are done
- }
- }
-
-
- static class ChunkGroupIoTask {
-
- int seriesNumber;
- String deviceId;
- long version;
- boolean finished;
-
- public ChunkGroupIoTask(int seriesNumber, String deviceId, long version) {
- this(seriesNumber, deviceId, version, false);
- }
-
- public ChunkGroupIoTask(int seriesNumber, String deviceId, long version, boolean finished) {
- this.seriesNumber = seriesNumber;
- this.deviceId = deviceId;
- this.version = version;
- this.finished = finished;
- }
- }
-
- private static ChunkGroupIoTask EMPTY_TASK = new ChunkGroupIoTask(0, "", 0, true);
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
deleted file mode 100644
index 5c06ffa..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.memtable;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
-import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
-import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MemTableFlushUtil {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushUtil.class);
- private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
-
- private MemTableFlushUtil() {
-
- }
-
- private static void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
- TSDataType dataType)
- throws IOException {
- for (TimeValuePair timeValuePair : tvPairs) {
- switch (dataType) {
- case BOOLEAN:
- seriesWriterImpl
- .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
- break;
- case INT32:
- seriesWriterImpl.write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getInt());
- break;
- case INT64:
- seriesWriterImpl.write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getLong());
- break;
- case FLOAT:
- seriesWriterImpl.write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getFloat());
- break;
- case DOUBLE:
- seriesWriterImpl
- .write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getDouble());
- break;
- case TEXT:
- seriesWriterImpl
- .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
- break;
- default:
- LOGGER.error("don't support data type: {}", dataType);
- break;
- }
- }
- }
-
- /**
- * the function for flushing memtable.
- */
- public static void flushMemTable(FileSchema fileSchema, TsFileIOWriter tsFileIoWriter,
- IMemTable imemTable, long version) throws IOException {
- long tmpTime;
- long sortTime = 0;
- long memSerializeTime = 0;
- long ioTime = 0;
- for (String deviceId : imemTable.getMemTableMap().keySet()) {
- long startPos = tsFileIoWriter.getPos();
- tsFileIoWriter.startChunkGroup(deviceId);
- int seriesNumber = imemTable.getMemTableMap().get(deviceId).size();
- for (String measurementId : imemTable.getMemTableMap().get(deviceId).keySet()) {
- long startTime = System.currentTimeMillis();
- // TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
- IWritableMemChunk series = imemTable.getMemTableMap().get(deviceId).get(measurementId);
- MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
- List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
- tmpTime = System.currentTimeMillis();
- sortTime += tmpTime - startTime;
- ChunkBuffer chunkBuffer = new ChunkBuffer(desc);
- IChunkWriter seriesWriter = new ChunkWriterImpl(desc, chunkBuffer, PAGE_SIZE_THRESHOLD);
- writeOneSeries(sortedTimeValuePairs, seriesWriter, desc.getType());
- startTime = System.currentTimeMillis();
- memSerializeTime += startTime - tmpTime;
- seriesWriter.writeToFileWriter(tsFileIoWriter);
- ioTime += System.currentTimeMillis() - startTime;
- }
- tmpTime = System.currentTimeMillis();
- tsFileIoWriter.endChunkGroup(version);
- ioTime += System.currentTimeMillis() - tmpTime;
- }
- LOGGER.info(
- "flushing a memtable into disk: data sort time cost {} ms, serialize data into mem cost {} ms, io cost {} ms.",
- sortTime , memSerializeTime , ioTime );
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
deleted file mode 100644
index 83ec8bd..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.memtable;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.iotdb.db.utils.PrimitiveArrayListV2;
-import org.apache.iotdb.db.utils.PrimitiveDataListPool;
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.db.utils.TsPrimitiveType;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
-
-public class WritableMemChunk implements IWritableMemChunk {
-
- private TSDataType dataType;
- private PrimitiveArrayListV2 list;
- private long timeOffset = 0;
-
- public WritableMemChunk(TSDataType dataType) {
- this.dataType = dataType;
- this.list = getPrimitiveDataListByDataType(dataType);
- }
-
- public PrimitiveArrayListV2 getPrimitiveDataListByDataType(TSDataType dataType) {
- switch (dataType) {
- case BOOLEAN:
- return new PrimitiveArrayListV2(boolean.class);
- case INT32:
- return new PrimitiveArrayListV2(int.class);
- case INT64:
- return new PrimitiveArrayListV2(long.class);
- case FLOAT:
- return new PrimitiveArrayListV2(float.class);
- case DOUBLE:
- return new PrimitiveArrayListV2(double.class);
- case TEXT:
- return new PrimitiveArrayListV2(Binary.class);
- default:
- throw new UnSupportedDataTypeException("DataType: " + dataType);
- }
- }
-
- @Override
- public void write(long insertTime, String insertValue) {
- switch (dataType) {
- case BOOLEAN:
- putBoolean(insertTime, Boolean.valueOf(insertValue));
- break;
- case INT32:
- putInt(insertTime, Integer.valueOf(insertValue));
- break;
- case INT64:
- putLong(insertTime, Long.valueOf(insertValue));
- break;
- case FLOAT:
- putFloat(insertTime, Float.valueOf(insertValue));
- break;
- case DOUBLE:
- putDouble(insertTime, Double.valueOf(insertValue));
- break;
- case TEXT:
- putBinary(insertTime, Binary.valueOf(insertValue));
- break;
- default:
- throw new UnSupportedDataTypeException("Unsupported data type:" + dataType);
- }
- }
-
- @Override
- public void write(long insertTime, Object value) {
- switch (dataType) {
- case BOOLEAN:
- putBoolean(insertTime, (Boolean)value);
- break;
- case INT32:
- putInt(insertTime, (Integer)value);
- break;
- case INT64:
- putLong(insertTime, (Long)value);
- break;
- case FLOAT:
- putFloat(insertTime, (Float)value);
- break;
- case DOUBLE:
- putDouble(insertTime, (Double)value);
- break;
- case TEXT:
- putBinary(insertTime, (Binary)value);
- break;
- default:
- throw new UnSupportedDataTypeException("Unsupported data type:" + dataType);
- }
- }
-
-
- @Override
- public void putLong(long t, long v) {
- list.putTimestamp(t, v);
- }
-
- @Override
- public void putInt(long t, int v) {
- list.putTimestamp(t, v);
- }
-
- @Override
- public void putFloat(long t, float v) {
- list.putTimestamp(t, v);
- }
-
- @Override
- public void putDouble(long t, double v) {
- list.putTimestamp(t, v);
- }
-
- @Override
- public void putBinary(long t, Binary v) {
- list.putTimestamp(t, v);
- }
-
- @Override
- public void putBoolean(long t, boolean v) {
- list.putTimestamp(t, v);
- }
-
- @Override
- // TODO: Consider using arrays to sort and remove duplicates
- public List<TimeValuePair> getSortedTimeValuePairList() {
- int length = list.getTotalDataNumber();
- Map<Long, TsPrimitiveType> map = new HashMap<>(length, 1.0f);
- for (int i = 0; i < length; i++) {
- if (list.getTimestamp(i) >= timeOffset) {
- map.put(list.getTimestamp(i), TsPrimitiveType.getByType(dataType, list.getValue(i)));
- }
- }
- List<TimeValuePair> ret = new ArrayList<>(map.size());
- map.forEach((k, v) -> ret.add(new TimeValuePairInMemTable(k, v)));
- ret.sort(TimeValuePair::compareTo);
- return ret;
- }
-
- @Override
- public DeduplicatedSortedData getDeduplicatedSortedData() {
- int length = list.getTotalDataNumber();
- List<TimeValuePair> data = new ArrayList<>(length);
- for (int i = 0; i < length; i++) {
- if (list.getTimestamp(i) >= timeOffset) {
- data.add(new TimeValuePairInMemTable(list.getTimestamp(i), TsPrimitiveType.getByType(dataType, list.getValue(i))));
- }
- }
- return new DeduplicatedSortedData(data);
- }
-
- @Override
- public long count() {
- return list.getTotalDataNumber();
- }
-
- @Override
- public TSDataType getType() {
- return dataType;
- }
-
- @Override
- public void setTimeOffset(long offset) {
- timeOffset = offset;
- }
-
- @Override
- public void releasePrimitiveArrayList(){
- PrimitiveDataListPool.getInstance().release(list);
- }
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java
deleted file mode 100644
index de50efc..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.io;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-
-import org.apache.iotdb.tsfile.read.reader.TsFileInput;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
-
-public class OverflowIO extends TsFileIOWriter {
-
- private OverflowReadWriter overflowReadWriter;
-
- public OverflowIO(OverflowReadWriter overflowReadWriter) throws IOException {
- super(overflowReadWriter, new ArrayList<>());
- this.overflowReadWriter = overflowReadWriter;
- toTail();
- }
-
- public void clearRowGroupMetadatas() {
- super.chunkGroupMetaDataList.clear();
- }
-
- @Override
- public long getPos() throws IOException {
- return overflowReadWriter.getPosition();
- }
-
- public void toTail() throws IOException {
- overflowReadWriter.toTail();
- }
-
- public void close() throws IOException {
- super.close();
- overflowReadWriter.close();
- }
-
- public void flush() throws IOException {
- overflowReadWriter.flush();
- }
-
- public TsFileInput getReader() {
- return overflowReadWriter;
- }
-
- public TsFileOutput getWriter() {
- return overflowReadWriter;
- }
-
- public OutputStream getOutputStream() {
- return overflowReadWriter;
- }
-
- public static class OverflowReadWriter extends OutputStream implements TsFileOutput, TsFileInput {
-
- private static final String RW_MODE = "rw";
- private RandomAccessFile raf;
-
- public OverflowReadWriter(String filepath) throws FileNotFoundException {
- this.raf = new RandomAccessFile(filepath, RW_MODE);
- }
-
- public void toTail() throws IOException {
- raf.seek(raf.length());
- }
-
- @Override
- public long size() throws IOException {
- return raf.length();
- }
-
- @Override
- public long position() throws IOException {
- return raf.getFilePointer();
- }
-
- @Override
- public TsFileInput position(long newPosition) throws IOException {
- raf.seek(newPosition);
- return this;
- }
-
- @Override
- public int read(ByteBuffer dst) throws IOException {
- return raf.getChannel().read(dst);
- }
-
- @Override
- public int read(ByteBuffer dst, long position) throws IOException {
- return raf.getChannel().read(dst, position);
- }
-
- @Override
- public FileChannel wrapAsFileChannel() throws IOException {
- return raf.getChannel();
- }
-
- @Override
- public InputStream wrapAsInputStream() throws IOException {
- return Channels.newInputStream(raf.getChannel());
- }
-
- @Override
- public int read() throws IOException {
- return raf.read();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- raf.readFully(b, off, len);
- return len;
- }
-
- @Override
- public int readInt() throws IOException {
- return raf.readInt();
- }
-
- @Override
- public void write(ByteBuffer b) throws IOException {
- raf.getChannel().write(b);
- }
-
- @Override
- public long getPosition() throws IOException {
- return raf.getFilePointer();
- }
-
- @Override
- public OutputStream wrapAsStream() throws IOException {
- return Channels.newOutputStream(raf.getChannel());
- }
-
- @Override
- public void truncate(long position) throws IOException {
- raf.getChannel().truncate(position);
- }
-
- @Override
- public void write(int b) throws IOException {
- raf.write(b);
- }
-
- @Override
- public void close() throws IOException {
- raf.close();
- }
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
deleted file mode 100644
index ade2be6..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.io;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushCallBack;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.modification.Modification;
-import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.db.utils.QueryUtils;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.BytesUtils;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OverflowResource {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(OverflowResource.class);
-
- private static final String INSERT_FILE_NAME = "unseqTsFile";
- private static final String POSITION_FILE_NAME = "positionFile";
-
- private static final int FOOTER_LENGTH = 4;
- private static final int POS_LENGTH = 8;
- private String parentPath;
- private String dataPath;
- private String insertFilePath;
- private String positionFilePath;
- private File insertFile;
- private OverflowIO insertIO;
- private Map<String, Map<String, List<ChunkMetaData>>> insertMetadatas;
- private List<ChunkGroupMetaData> appendInsertMetadatas;
- private VersionController versionController;
- private ModificationFile modificationFile;
- private WriteLogNode logNode;
- private String processorName;
-
- public OverflowResource(String parentPath, String dataPath, VersionController versionController
- , String processorName)
- throws IOException {
- this.insertMetadatas = new HashMap<>();
- this.appendInsertMetadatas = new ArrayList<>();
- this.parentPath = parentPath;
- this.dataPath = dataPath;
- this.processorName = processorName;
- File dataFile = new File(parentPath, dataPath);
- if (!dataFile.exists()) {
- dataFile.mkdirs();
- }
- insertFile = new File(dataFile, INSERT_FILE_NAME);
- insertFilePath = insertFile.getPath();
- positionFilePath = new File(dataFile, POSITION_FILE_NAME).getPath();
- this.versionController = versionController;
- modificationFile = new ModificationFile(insertFilePath + ModificationFile.FILE_SUFFIX);
-
- Pair<Long, Long> position = readPositionInfo();
- try {
- // insert stream
- OverflowIO.OverflowReadWriter readWriter = new OverflowIO.OverflowReadWriter(insertFilePath);
- // truncate
- readWriter.wrapAsFileChannel().truncate(position.left);
- // reposition
- // seek to zero
- readWriter.close();
- // seek to tail
- // the tail is at least the len of magic string
- readWriter = new OverflowIO.OverflowReadWriter(insertFilePath);
- insertIO = new OverflowIO(readWriter);
- readMetadata();
- } catch (FileNotFoundException e){
- LOGGER.debug("Failed to construct the OverflowIO.", e);
- }
- }
-
- private Pair<Long, Long> readPositionInfo() throws IOException {
- File positionFile = new File(positionFilePath);
- if (positionFile.exists()) {
- try(FileInputStream inputStream = new FileInputStream(positionFile)) {
- byte[] insertPositionData = new byte[8];
- byte[] updatePositionData = new byte[8];
- int byteRead = inputStream.read(insertPositionData);
- if (byteRead != 8) {
- throw new IOException("Not enough bytes for insertPositionData");
- }
- byteRead = inputStream.read(updatePositionData);
- if (byteRead != 8) {
- throw new IOException("Not enough bytes for updatePositionData");
- }
- long lastInsertPosition = BytesUtils.bytesToLong(insertPositionData);
- long lastUpdatePosition = BytesUtils.bytesToLong(updatePositionData);
- return new Pair<>(lastInsertPosition, lastUpdatePosition);
- }
- } else {
- LOGGER.debug("No position info, returning a default value");
- long left = 0;
- long right = 0;
- File insertTempFile = new File(insertFilePath);
- if (insertTempFile.exists()) {
- left = insertTempFile.length();
- }
- return new Pair<>(left, right);
- }
-
- }
-
- private void writePositionInfo(long lastInsertPosition, long lastUpdatePosition)
- throws IOException {
- try(FileOutputStream outputStream = new FileOutputStream(positionFilePath)) {
- byte[] data = new byte[16];
- BytesUtils.longToBytes(lastInsertPosition, data, 0);
- BytesUtils.longToBytes(lastUpdatePosition, data, 8);
- outputStream.write(data);
- }
- }
-
- private void readMetadata() throws IOException {
- // read insert meta-data
- insertIO.toTail();
- long position = insertIO.getPos();
- while (position != TsFileIOWriter.magicStringBytes.length) {
- insertIO.getReader().position(position - FOOTER_LENGTH);
- int metadataLength = insertIO.getReader().readInt();
- byte[] buf = new byte[metadataLength];
- insertIO.getReader().position(position - FOOTER_LENGTH - metadataLength);
- insertIO.getReader().read(buf, 0, buf.length);
- ByteArrayInputStream inputStream = new ByteArrayInputStream(buf);
- TsDeviceMetadata tsDeviceMetadata = TsDeviceMetadata.deserializeFrom(inputStream);
- byte[] bytesPosition = new byte[8];
- insertIO.getReader().position(position - FOOTER_LENGTH - metadataLength - POS_LENGTH);
- insertIO.getReader().read(bytesPosition, 0, POS_LENGTH);
- position = BytesUtils.bytesToLong(bytesPosition);
- for (ChunkGroupMetaData rowGroupMetaData : tsDeviceMetadata.getChunkGroupMetaDataList()) {
- String deviceId = rowGroupMetaData.getDeviceID();
- if (!insertMetadatas.containsKey(deviceId)) {
- insertMetadatas.put(deviceId, new HashMap<>());
- }
- for (ChunkMetaData chunkMetaData : rowGroupMetaData.getChunkMetaDataList()) {
- chunkMetaData.setVersion(rowGroupMetaData.getVersion());
- String measurementId = chunkMetaData.getMeasurementUid();
- if (!insertMetadatas.get(deviceId).containsKey(measurementId)) {
- insertMetadatas.get(deviceId).put(measurementId, new ArrayList<>());
- }
- insertMetadatas.get(deviceId).get(measurementId).add(0, chunkMetaData);
- }
- }
- }
- }
-
- public List<ChunkMetaData> getInsertMetadatas(String deviceId, String measurementId,
- TSDataType dataType, QueryContext context) {
- List<ChunkMetaData> chunkMetaDatas = new ArrayList<>();
- if (insertMetadatas.containsKey(deviceId) && insertMetadatas.get(deviceId)
- .containsKey(measurementId)) {
- for (ChunkMetaData chunkMetaData : insertMetadatas.get(deviceId).get(measurementId)) {
- // filter
- if (chunkMetaData.getTsDataType().equals(dataType)) {
- chunkMetaDatas.add(chunkMetaData);
- }
- }
- }
- try {
- List<Modification> modifications = context.getPathModifications(modificationFile,
- deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId);
- QueryUtils.modifyChunkMetaData(chunkMetaDatas, modifications);
- } catch (IOException e) {
- LOGGER.error("Cannot access the modification file of Overflow {}, because:", parentPath,
- e);
- }
- return chunkMetaDatas;
- }
-
- public void flush(FileSchema fileSchema, IMemTable memTable, String processorName,
- long flushId, MemTableFlushCallBack removeFlushedMemTable)
- throws IOException {
- // insert data
- long startPos = insertIO.getPos();
- long startTime = System.currentTimeMillis();
- flush2(fileSchema, memTable, processorName, flushId, removeFlushedMemTable);
- long timeInterval = System.currentTimeMillis() - startTime;
- timeInterval = timeInterval == 0 ? 1 : timeInterval;
- long insertSize = insertIO.getPos() - startPos;
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info(
- "Overflow processor {} flushes overflow insert data, actual:{}, time consumption:{} ms,"
- + " flush rate:{}/s",
- processorName, MemUtils.bytesCntToStr(insertSize), timeInterval,
- MemUtils.bytesCntToStr(insertSize / timeInterval * 1000));
- }
-
- writePositionInfo(insertIO.getPos(), 0);
- }
-
- public void flush2(FileSchema fileSchema, IMemTable memTable, String processorName,
- long flushId, MemTableFlushCallBack removeFlushedMemTable) throws IOException {
- if (memTable != null && !memTable.isEmpty()) {
- insertIO.toTail();
- long lastPosition = insertIO.getPos();
-// MemTableFlushUtil.flushMemTable(fileSchema, insertIO, memTable,
-// versionController.nextVersion());
-// MemTableFlushTask task = new MemTableFlushTask(insertIO, processorName);
- MemTableFlushTask tableFlushTask = new MemTableFlushTask(insertIO, processorName, flushId,
- removeFlushedMemTable);
- tableFlushTask.flushMemTable(fileSchema, memTable, versionController.nextVersion());
-
- List<ChunkGroupMetaData> rowGroupMetaDatas = insertIO.getChunkGroupMetaDatas();
- appendInsertMetadatas.addAll(rowGroupMetaDatas);
- if (!rowGroupMetaDatas.isEmpty()) {
- insertIO.getWriter().write(BytesUtils.longToBytes(lastPosition));
- TsDeviceMetadata tsDeviceMetadata = new TsDeviceMetadata();
- tsDeviceMetadata.setChunkGroupMetadataList(rowGroupMetaDatas);
- long start = insertIO.getPos();
- tsDeviceMetadata.serializeTo(insertIO.getOutputStream());
- long end = insertIO.getPos();
- insertIO.getWriter().write(BytesUtils.intToBytes((int) (end - start)));
- // clear the meta-data of insert IO
- insertIO.clearRowGroupMetadatas();
- }
- }
- }
-
- public void appendMetadatas() {
- if (!appendInsertMetadatas.isEmpty()) {
- for (ChunkGroupMetaData rowGroupMetaData : appendInsertMetadatas) {
- for (ChunkMetaData seriesChunkMetaData : rowGroupMetaData.getChunkMetaDataList()) {
- addInsertMetadata(rowGroupMetaData.getDeviceID(), seriesChunkMetaData.getMeasurementUid(),
- seriesChunkMetaData);
- }
- }
- appendInsertMetadatas.clear();
- }
- }
-
- public String getInsertFilePath() {
- return insertFilePath;
- }
-
- public File getInsertFile() {
- return insertFile;
- }
-
- public String getPositionFilePath() {
- return positionFilePath;
- }
-
- public void close() throws IOException {
- insertMetadatas.clear();
- insertIO.close();
- modificationFile.close();
- }
-
- public void deleteResource() throws IOException {
- FileUtils.forceDelete(new File(parentPath, dataPath));
- }
-
- private void cleanDir(String dir) throws IOException {
- File file = new File(dir);
- if (file.exists()) {
- if (file.isDirectory()) {
- File[] files = file.listFiles();
- if (files == null) {
- return;
- }
- for (File subFile : files) {
- cleanDir(subFile.getAbsolutePath());
- }
- }
- if (!file.delete()) {
- throw new IOException(String.format("The file %s can't be deleted", dir));
- }
- }
- }
-
- private void addInsertMetadata(String deviceId, String measurementId,
- ChunkMetaData chunkMetaData) {
- if (!insertMetadatas.containsKey(deviceId)) {
- insertMetadatas.put(deviceId, new HashMap<>());
- }
- if (!insertMetadatas.get(deviceId).containsKey(measurementId)) {
- insertMetadatas.get(deviceId).put(measurementId, new ArrayList<>());
- }
- insertMetadatas.get(deviceId).get(measurementId).add(chunkMetaData);
- }
-
- /**
- * Delete data of a timeseries whose time ranges from 0 to timestamp.
- *
- * @param deviceId the deviceId of the timeseries.
- * @param measurementId the measurementId of the timeseries.
- * @param timestamp the upper-bound of deletion time.
- * @param updatedModFiles add successfully updated modificationFile to this list, so that the
- * deletion can be aborted when exception is thrown.
- */
- public void delete(String deviceId, String measurementId, long timestamp, long version,
- List<ModificationFile> updatedModFiles)
- throws IOException {
- modificationFile.write(new Deletion(new Path(deviceId, measurementId), version, timestamp));
- updatedModFiles.add(modificationFile);
- }
-
- public WriteLogNode getLogNode() throws IOException {
- if (logNode == null) {
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- logNode = MultiFileLogNodeManager.getInstance().getNode(
- logNodePrefix() + insertFile.getName());
- }
- }
- return logNode;
- }
-
- public String logNodePrefix() {
- return processorName + "-Overflow-";
- }
-
- public ModificationFile getModificationFile() {
- return modificationFile;
- }
-
- public OverflowIO getInsertIO() {
- return insertIO;
- }
-
- public List<ChunkGroupMetaData> getAppendInsertMetadatas() {
- return appendInsertMetadatas;
- }
-
- public VersionController getVersionController() {
- return versionController;
- }
-}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowedTsFileIOWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowedTsFileIOWriter.java
deleted file mode 100644
index db86a78..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowedTsFileIOWriter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.overflow.io;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import org.apache.iotdb.tsfile.write.writer.DefaultTsFileOutput;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-
-public class OverflowedTsFileIOWriter extends TsFileIOWriter {
-
- public OverflowedTsFileIOWriter(File file) throws FileNotFoundException {
- super();
- this.out = new DefaultTsFileOutput(file, true);
-
- }
-
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFFileMetadata.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFFileMetadata.java
deleted file mode 100644
index 5297aa6..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFFileMetadata.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.metadata;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-public class OFFileMetadata {
-
- private long lastFooterOffset;
- private List<OFRowGroupListMetadata> rowGroupLists;
-
- public OFFileMetadata() {
- //allowed to do nothing
- }
-
- public OFFileMetadata(long lastFooterOffset, List<OFRowGroupListMetadata> rowGroupLists) {
- this.lastFooterOffset = lastFooterOffset;
- this.rowGroupLists = rowGroupLists;
- }
-
- /**
- * function for deserializing data from input stream.
- */
- public static OFFileMetadata deserializeFrom(InputStream inputStream) throws IOException {
- long lastFooterOffset = ReadWriteIOUtils.readLong(inputStream);
- int size = ReadWriteIOUtils.readInt(inputStream);
- List<OFRowGroupListMetadata> list = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- list.add(OFRowGroupListMetadata.deserializeFrom(inputStream));
- }
- return new OFFileMetadata(lastFooterOffset, list);
- }
-
- public static OFFileMetadata deserializeFrom(ByteBuffer buffer) throws IOException {
- throw new IOException("The function has not been implemented.");
- }
-
- /**
- * add OFRowGroupListMetadata to list.
- */
- public void addRowGroupListMetaData(OFRowGroupListMetadata rowGroupListMetadata) {
- if (rowGroupLists == null) {
- rowGroupLists = new ArrayList<>();
- }
- rowGroupLists.add(rowGroupListMetadata);
- }
-
- public List<OFRowGroupListMetadata> getRowGroupLists() {
- return rowGroupLists == null ? null : Collections.unmodifiableList(rowGroupLists);
- }
-
- public long getLastFooterOffset() {
- return lastFooterOffset;
- }
-
- public void setLastFooterOffset(long lastFooterOffset) {
- this.lastFooterOffset = lastFooterOffset;
- }
-
- @Override
- public String toString() {
- return String.format("OFFileMetadata{ last offset: %d, RowGroupLists: %s }", lastFooterOffset,
- rowGroupLists.toString());
- }
-
- /**
- * function for serializing data to output stream.
- */
- public int serializeTo(OutputStream outputStream) throws IOException {
- int byteLen = 0;
- byteLen += ReadWriteIOUtils.write(lastFooterOffset, outputStream);
- int size = rowGroupLists.size();
- byteLen += ReadWriteIOUtils.write(size, outputStream);
- for (OFRowGroupListMetadata ofRowGroupListMetadata : rowGroupLists) {
- byteLen += ofRowGroupListMetadata.serializeTo(outputStream);
- }
- return byteLen;
- }
-
- public int serializeTo(ByteBuffer buffer) throws IOException {
- throw new IOException("The function has not been implemented.");
- }
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFRowGroupListMetadata.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFRowGroupListMetadata.java
deleted file mode 100644
index 274fb03..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFRowGroupListMetadata.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.metadata;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-/**
- * Metadata of overflow RowGroup list.
- */
-public class OFRowGroupListMetadata {
-
- private String deviceId;
- private List<OFSeriesListMetadata> seriesList;
-
- private OFRowGroupListMetadata() {
- }
-
- public OFRowGroupListMetadata(String deviceId) {
- this.deviceId = deviceId;
- seriesList = new ArrayList<>();
- }
-
- /**
- * function for deserializing data from input stream.
- */
- public static OFRowGroupListMetadata deserializeFrom(InputStream inputStream) throws IOException {
- OFRowGroupListMetadata ofRowGroupListMetadata = new OFRowGroupListMetadata();
- ofRowGroupListMetadata.deviceId = ReadWriteIOUtils.readString(inputStream);
- int size = ReadWriteIOUtils.readInt(inputStream);
- List<OFSeriesListMetadata> list = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- list.add(OFSeriesListMetadata.deserializeFrom(inputStream));
- }
- ofRowGroupListMetadata.seriesList = list;
- return ofRowGroupListMetadata;
- }
-
- public static OFRowGroupListMetadata deserializeFrom(ByteBuffer buffer) throws IOException {
- throw new IOException("The function has not been implemented.");
- }
-
- /**
- * add OFSeriesListMetadata metadata to list.
- */
- public void addSeriesListMetaData(OFSeriesListMetadata timeSeries) {
- if (seriesList == null) {
- seriesList = new ArrayList<>();
- }
- seriesList.add(timeSeries);
- }
-
- public List<OFSeriesListMetadata> getSeriesList() {
- return seriesList == null ? null : Collections.unmodifiableList(seriesList);
- }
-
- @Override
- public String toString() {
- return String.format("OFRowGroupListMetadata{ deviceId id: %s, series Lists: %s }", deviceId,
- seriesList.toString());
- }
-
- public String getdeviceId() {
- return deviceId;
- }
-
- /**
- * function for serialing data to output stream.
- */
- public int serializeTo(OutputStream outputStream) throws IOException {
- int byteLen = 0;
- byteLen += ReadWriteIOUtils.write(deviceId, outputStream);
- int size = seriesList.size();
- byteLen += ReadWriteIOUtils.write(size, outputStream);
- for (OFSeriesListMetadata ofSeriesListMetadata : seriesList) {
- byteLen += ofSeriesListMetadata.serializeTo(outputStream);
- }
- return byteLen;
- }
-
- /**
- * function for serializing data to byte buffer.
- */
- public int serializeTo(ByteBuffer buffer) throws IOException {
- throw new IOException("The function has not been implemented.");
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFSeriesListMetadata.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFSeriesListMetadata.java
deleted file mode 100644
index 40f7302..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFSeriesListMetadata.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.metadata;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-/**
- * Metadata of overflow series list.
- */
-public class OFSeriesListMetadata {
-
- private String measurementId;
- private List<ChunkMetaData> timeSeriesList;
-
- private OFSeriesListMetadata() {
- }
-
- public OFSeriesListMetadata(String measurementId, List<ChunkMetaData> timeSeriesList) {
- this.measurementId = measurementId;
- this.timeSeriesList = timeSeriesList;
- }
-
- /**
- * function for deserializing data from input stream.
- */
- public static OFSeriesListMetadata deserializeFrom(InputStream inputStream) throws IOException {
- OFSeriesListMetadata ofSeriesListMetadata = new OFSeriesListMetadata();
- ofSeriesListMetadata.measurementId = ReadWriteIOUtils.readString(inputStream);
- int size = ReadWriteIOUtils.readInt(inputStream);
- List<ChunkMetaData> list = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- ChunkMetaData chunkMetaData = ChunkMetaData.deserializeFrom(inputStream);
- list.add(chunkMetaData);
- }
- ofSeriesListMetadata.timeSeriesList = list;
- return ofSeriesListMetadata;
- }
-
- public static OFSeriesListMetadata deserializeFrom(ByteBuffer buffer) throws IOException {
- throw new IOException("The function has not been implemented.");
- }
-
- /**
- * add TimeSeriesChunkMetaData to timeSeriesList.
- */
- public void addSeriesMetaData(ChunkMetaData timeSeries) {
- if (timeSeriesList == null) {
- timeSeriesList = new ArrayList<>();
- }
- timeSeriesList.add(timeSeries);
- }
-
- public List<ChunkMetaData> getMetaDatas() {
- return timeSeriesList == null ? null : Collections.unmodifiableList(timeSeriesList);
- }
-
- @Override
- public String toString() {
- return String.format("OFSeriesListMetadata{ measurementId id: %s, series: %s }", measurementId,
- timeSeriesList.toString());
- }
-
- public String getMeasurementId() {
- return measurementId;
- }
-
- /**
- * function for serializing data to output stream.
- */
- public int serializeTo(OutputStream outputStream) throws IOException {
- int byteLen = 0;
- byteLen += ReadWriteIOUtils.write(measurementId, outputStream);
- byteLen += ReadWriteIOUtils.write(timeSeriesList.size(), outputStream);
- for (ChunkMetaData chunkMetaData : timeSeriesList) {
- byteLen += chunkMetaData.serializeTo(outputStream);
- }
- return byteLen;
- }
-
- public int serializeTo(ByteBuffer buffer) throws IOException {
- throw new IOException("The function has not been implemented.");
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/utils/MergeStatus.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/utils/MergeStatus.java
deleted file mode 100644
index 9b9aa85..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/utils/MergeStatus.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.utils;
-
-/**
- * Used for IntervalTreeOperation.queryMemory() and IntervalTreeOperation.queryFileBlock().
- *
- * <p>DONE means that a time pair is not used or this time pair has been merged into a new
- * DynamicOneColumn MERGING means that a time pair is merging into a new DynamicOneColumn
- */
-public enum MergeStatus {
- DONE, MERGING
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/utils/OverflowOpType.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/utils/OverflowOpType.java
deleted file mode 100644
index ba4c4f9..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/utils/OverflowOpType.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.overflow.utils;
-
-/**
- * Include three types: INSERT,UPDATE,DELETE; INSERT is an operation which inserts a time point.<br>
- * UPDATE is an operation which updates a time range.<br> DELETE is an operation which deletes a
- * time range. Note that DELETE operation could only delete a time which is less than given time T.
- * <br>
- */
-public enum OverflowOpType {
- INSERT, UPDATE, DELETE
-}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSource.java
index 176a30e..374c790 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSource.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.engine.querycontext;
import java.util.List;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
import org.apache.iotdb.tsfile.read.common.Path;
public class GlobalSortedSeriesDataSource {
@@ -27,7 +27,7 @@ public class GlobalSortedSeriesDataSource {
private Path seriesPath;
// sealed tsfile
- private List<TsFileResource> sealedTsFiles;
+ private List<TsFileResourceV2> sealedTsFiles;
// unsealed tsfile
private UnsealedTsFile unsealedTsFile;
@@ -35,7 +35,7 @@ public class GlobalSortedSeriesDataSource {
// seq mem-table
private ReadOnlyMemChunk readableChunk;
- public GlobalSortedSeriesDataSource(Path seriesPath, List<TsFileResource> sealedTsFiles,
+ public GlobalSortedSeriesDataSource(Path seriesPath, List<TsFileResourceV2> sealedTsFiles,
UnsealedTsFile unsealedTsFile,
ReadOnlyMemChunk readableChunk) {
this.seriesPath = seriesPath;
@@ -49,11 +49,11 @@ public class GlobalSortedSeriesDataSource {
return sealedTsFiles != null && !sealedTsFiles.isEmpty();
}
- public List<TsFileResource> getSealedTsFiles() {
+ public List<TsFileResourceV2> getSealedTsFiles() {
return sealedTsFiles;
}
- public void setSealedTsFiles(List<TsFileResource> sealedTsFiles) {
+ public void setSealedTsFiles(List<TsFileResourceV2> sealedTsFiles) {
this.sealedTsFiles = sealedTsFiles;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
deleted file mode 100644
index f63915e..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.querycontext;
-
-public class QueryDataSource {
-
- // sequence data source
- private GlobalSortedSeriesDataSource seriesDataSource;
-
- // unSequence data source
- private OverflowSeriesDataSource overflowSeriesDataSource;
-
- public QueryDataSource(GlobalSortedSeriesDataSource seriesDataSource,
- OverflowSeriesDataSource overflowSeriesDataSource) {
- this.seriesDataSource = seriesDataSource;
- this.overflowSeriesDataSource = overflowSeriesDataSource;
- }
-
- public GlobalSortedSeriesDataSource getSeqDataSource() {
- return seriesDataSource;
- }
-
- public OverflowSeriesDataSource getOverflowSeriesDataSource() {
- return overflowSeriesDataSource;
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
index 5a17544..f5d9758 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
@@ -21,10 +21,7 @@ package org.apache.iotdb.db.query.control;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
-import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
/**
@@ -54,27 +51,6 @@ public class JobFileManager {
unsealedFilePathsMap.computeIfAbsent(jobId, x -> new HashSet<>());
}
- /**
- * Add the unique file paths to sealedFilePathsMap and unsealedFilePathsMap.
- */
- public void addUsedFilesForGivenJob(long jobId, QueryDataSource dataSource) {
- for (TsFileResource tsFileResource : dataSource.getSeqDataSource().getSealedTsFiles()) {
- String sealedFilePath = tsFileResource.getFilePath();
- addFilePathToMap(jobId, sealedFilePath, true);
- }
-
- if (dataSource.getSeqDataSource().hasUnsealedTsFile()) {
- String unSealedFilePath = dataSource.getSeqDataSource().getUnsealedTsFile().getFilePath();
- addFilePathToMap(jobId, unSealedFilePath, false);
- }
-
- for (OverflowInsertFile overflowInsertFile : dataSource.getOverflowSeriesDataSource()
- .getOverflowInsertFileList()) {
- String overflowFilePath = overflowInsertFile.getFilePath();
- // overflow is unclosed by default
- addFilePathToMap(jobId, overflowFilePath, false);
- }
- }
/**
* Add the unique file paths to sealedFilePathsMap and unsealedFilePathsMap.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index 20c9018..36d5c37 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -26,7 +26,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.engine.filenodeV2.FileNodeManagerV2;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.ProcessorException;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/rescon/PrimitiveDataListPool.java b/iotdb/src/main/java/org/apache/iotdb/db/rescon/PrimitiveDataListPool.java
new file mode 100644
index 0000000..a1dfd5f
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/rescon/PrimitiveDataListPool.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.rescon;
+
+import java.util.ArrayDeque;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.iotdb.db.utils.PrimitiveArrayListV2;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manage all primitive data list in memory, including get and release operation.
+ */
+public class PrimitiveDataListPool {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PrimitiveDataListPool.class);
+
+ private static final EnumMap<TSDataType, ArrayDeque> primitiveDataListsMap = new EnumMap<>(TSDataType.class);
+
+ public static final int ARRAY_SIZE = 128;
+
+ static {
+ primitiveDataListsMap.put(TSDataType.BOOLEAN, new ArrayDeque());
+ primitiveDataListsMap.put(TSDataType.INT32, new ArrayDeque());
+ primitiveDataListsMap.put(TSDataType.INT64, new ArrayDeque());
+ primitiveDataListsMap.put(TSDataType.FLOAT, new ArrayDeque());
+ primitiveDataListsMap.put(TSDataType.DOUBLE, new ArrayDeque());
+ primitiveDataListsMap.put(TSDataType.TEXT, new ArrayDeque());
+ }
+
+ public static PrimitiveDataListPool getInstance() {
+ return INSTANCE;
+ }
+
+ private static final PrimitiveDataListPool INSTANCE = new PrimitiveDataListPool();
+
+
+ private PrimitiveDataListPool() {}
+
+ public synchronized Object getPrimitiveDataListByType(TSDataType dataType) {
+ ArrayDeque dataListQueue = primitiveDataListsMap.computeIfAbsent(dataType, (k)->new ArrayDeque<>());
+ Object dataArray = dataListQueue.poll();
+ switch (dataType) {
+ case BOOLEAN:
+ if (dataArray == null) {
+ dataArray = new boolean[ARRAY_SIZE];
+ }
+ break;
+ case INT32:
+ if (dataArray == null) {
+ dataArray = new int[ARRAY_SIZE];
+ }
+ break;
+ case INT64:
+ if (dataArray == null) {
+ dataArray = new long[ARRAY_SIZE];
+ }
+ break;
+ case FLOAT:
+ if (dataArray == null) {
+ dataArray = new float[ARRAY_SIZE];
+ }
+ break;
+ case DOUBLE:
+ if (dataArray == null) {
+ dataArray = new double[ARRAY_SIZE];
+ }
+ break;
+ case TEXT:
+ if (dataArray == null) {
+ dataArray = new Binary[ARRAY_SIZE];
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException("DataType: " + dataType);
+ }
+ return dataArray;
+ }
+
+
+ public synchronized void release(Object dataArray) {
+ if (dataArray instanceof boolean[]) {
+ primitiveDataListsMap.get(TSDataType.BOOLEAN).add(dataArray);
+ } else if (dataArray instanceof int[]) {
+ primitiveDataListsMap.get(TSDataType.INT32).add(dataArray);
+ } else if (dataArray instanceof long[]){
+ primitiveDataListsMap.get(TSDataType.INT64).add(dataArray);
+ } else if (dataArray instanceof float[]) {
+ primitiveDataListsMap.get(TSDataType.FLOAT).add(dataArray);
+ } else if (dataArray instanceof double[]) {
+ primitiveDataListsMap.get(TSDataType.DOUBLE).add(dataArray);
+ } else if (dataArray instanceof Binary[]) {
+ primitiveDataListsMap.get(TSDataType.TEXT).add(dataArray);
+ }
+ }
+
+ /**
+ * @param size needed capacity
+ * @return an array of primitive data arrays
+ */
+ public synchronized Object getDataListsByType(TSDataType dataType, int size) {
+ int arrayNumber = (int) Math.ceil((float) size / (float)ARRAY_SIZE);
+ switch (dataType) {
+ case BOOLEAN:
+ boolean[][] booleans = new boolean[arrayNumber][];
+ for (int i = 0; i < arrayNumber; i++) {
+ booleans[i] = (boolean[]) getPrimitiveDataListByType(dataType);
+ }
+ return booleans;
+ case INT32:
+ int[][] ints = new int[arrayNumber][];
+ for (int i = 0; i < arrayNumber; i++) {
+ ints[i] = (int[]) getPrimitiveDataListByType(dataType);
+ }
+ return ints;
+ case INT64:
+ long[][] longs = new long[arrayNumber][];
+ for (int i = 0; i < arrayNumber; i++) {
+ longs[i] = (long[]) getPrimitiveDataListByType(dataType);
+ }
+ return longs;
+ case FLOAT:
+ float[][] floats = new float[arrayNumber][];
+ for (int i = 0; i < arrayNumber; i++) {
+ floats[i] = (float[]) getPrimitiveDataListByType(dataType);
+ }
+ return floats;
+ case DOUBLE:
+ double[][] doubles = new double[arrayNumber][];
+ for (int i = 0; i < arrayNumber; i++) {
+ doubles[i] = (double[]) getPrimitiveDataListByType(dataType);
+ }
+ return doubles;
+ case TEXT:
+ Binary[][] binaries = new Binary[arrayNumber][];
+ for (int i = 0; i < arrayNumber; i++) {
+ binaries[i] = (Binary[]) getPrimitiveDataListByType(dataType);
+ }
+ return binaries;
+ default:
+ return null;
+ }
+ }
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
index 82c8b4b..c799c65 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
@@ -43,9 +43,8 @@ import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
-import org.apache.iotdb.db.engine.filenode.OverflowChangeType;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.filenodeV2.FileNodeManagerV2;
+import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.MetadataArgsErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -469,9 +468,9 @@ public class SyncServiceImpl implements SyncService.Iface {
// create a new fileNode
String header = syncDataPath;
String relativePath = path.substring(header.length());
- TsFileResource fileNode = new TsFileResource(startTimeMap, endTimeMap,
- OverflowChangeType.NO_CHANGE, new File(
- DirectoryManager.getInstance().getNextFolderIndexForTsFile() + File.separator + relativePath)
+ TsFileResourceV2 fileNode = new TsFileResourceV2(
+ new File(DirectoryManager.getInstance().getNextFolderIndexForTsFile() +
+ File.separator + relativePath), startTimeMap, endTimeMap
);
// call interface of load external file
try {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/tools/TsFileChecker.java b/iotdb/src/main/java/org/apache/iotdb/db/tools/TsFileChecker.java
deleted file mode 100644
index 6f7dcc7..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/tools/TsFileChecker.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.tools;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-import org.apache.iotdb.db.engine.bufferwrite.RestorableTsFileIOWriter;
-import org.apache.iotdb.db.tools.TsFileChecker.TsFileStatus;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.constant.SystemConstant;
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
-import org.apache.iotdb.tsfile.file.MetaMarker;
-import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
-import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.page.PageReader;
-import org.apache.iotdb.tsfile.utils.BytesUtils;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TsFileChecker implements Callable<TsFileStatus> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(TsFileChecker.class);
-
- private String path;
-
- public TsFileChecker(String path) {
- this.path = path;
- }
-
- public TsFileStatus checkTsFile(String path) {
- if (path.endsWith(RestorableTsFileIOWriter.RESTORE_SUFFIX)) {
- return TsFileStatus.RESTORE;
- }
-
- LOGGER.info("Start to check tsfile {}.", path);
- TsFileSequenceReader reader = null;
- long lastPosition = Long.MAX_VALUE;
- boolean timeSeq = true;
- try {
- boolean complete = true;
- String restoreFilePath = path + RestorableTsFileIOWriter.RESTORE_SUFFIX;
- long tsFileLen = new File(path).length();
- try {
- lastPosition = readLastPositionFromRestoreFile(restoreFilePath);
- complete = false;
- } catch (IOException ex) {
- lastPosition = tsFileLen;
- complete = true;
- }
-
- if (lastPosition > tsFileLen) {
- return TsFileStatus.UNCLOSED_DAMAGED;
- }
- if (lastPosition <= TSFileConfig.MAGIC_STRING.length()) {
- return TsFileStatus.UNCLOSED_INTACT;
- }
-
- reader = new TsFileSequenceReader(path, complete);
- if (!complete) {
- reader.position(TSFileConfig.MAGIC_STRING.length());
- }
- reader.readHeadMagic();
- byte marker;
- Map<String, long[]> timeRangeMap = new HashMap<>();
- Map<String, long[]> measurementTimeRangeMap = new HashMap<>();
- while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
- if (reader.position() > lastPosition) {
- return timeSeq ? TsFileStatus.UNCLOSED_INTACT : TsFileStatus.UNCLOSED_INTACT_TIME_UNSEQ;
- }
-
- switch (marker) {
- case MetaMarker.CHUNK_HEADER:
- ChunkHeader header = reader.readChunkHeader();
- List<Long> timeList = new ArrayList<>();
- Decoder defaultTimeDecoder = Decoder.getDecoderByType(
- TSEncoding.valueOf(TSFileConfig.timeSeriesEncoder),
- TSDataType.INT64);
- Decoder valueDecoder = Decoder
- .getDecoderByType(header.getEncodingType(), header.getDataType());
- for (int j = 0; j < header.getNumOfPages(); j++) {
- PageHeader pageHeader = reader.readPageHeader(header.getDataType());
- ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
- PageReader reader1 = new PageReader(pageData, header.getDataType(), valueDecoder,
- defaultTimeDecoder);
- while (reader1.hasNextBatch()) {
- BatchData batchData = reader1.nextBatch();
- while (batchData.hasNext()) {
- long time = batchData.currentTime();
- timeList.add(time);
- batchData.currentValue();
- batchData.next();
- }
- }
- }
- if (timeSeq) {
- timeSeq = isTimeSequential(timeList);
- if (timeList.size() > 0) {
- long startTime = timeList.get(0);
- long endTime = timeList.get(timeList.size() - 1);
- timeSeq = updateTimeRange(measurementTimeRangeMap, header.getMeasurementID(), startTime, endTime);
- }
- }
- break;
- case MetaMarker.CHUNK_GROUP_FOOTER:
- ChunkGroupFooter chunkGroupFooter = reader.readChunkGroupFooter();
- String deviceId = chunkGroupFooter.getDeviceID();
-
- if (timeSeq) {
- for (Entry<String, long[]> entry : measurementTimeRangeMap.entrySet()) {
- String key = deviceId + SystemConstant.PATH_SEPARATOR + entry.getKey();
- long[] range = entry.getValue();
- timeSeq = updateTimeRange(timeRangeMap, key, range[0], range[1]);
- if (!timeSeq) {
- break;
- }
- }
- measurementTimeRangeMap.clear();
- }
- if (reader.position() == lastPosition) {
- return timeSeq ? TsFileStatus.UNCLOSED_INTACT : TsFileStatus.UNCLOSED_INTACT_TIME_UNSEQ;
- }
- break;
- default:
- MetaMarker.handleUnexpectedMarker(marker);
- }
- }
-
- TsFileMetaData metaData = reader.readFileMetadata();
- List<TsDeviceMetadataIndex> deviceMetadataIndexList = metaData.getDeviceMap().values().stream()
- .sorted((x, y) -> (int) (x.getOffset() - y.getOffset())).collect(Collectors.toList());
- for (TsDeviceMetadataIndex index : deviceMetadataIndexList) {
- TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
- deviceMetadata.getChunkGroupMetaDataList();
- }
- reader.readTailMagic();
- reader.close();
-
- return timeSeq ? TsFileStatus.INTACT : TsFileStatus.INTACT_TIME_UNSEQ;
- } catch (Exception ex) {
- System.out.println(ex);
- try {
- if (reader != null && reader.position() > lastPosition) {
- return TsFileStatus.UNCLOSED_DAMAGED;
- } else {
- return TsFileStatus.DAMAGED;
- }
- } catch (IOException e) {
- return TsFileStatus.DAMAGED;
- }
- }
- }
-
- private boolean updateTimeRange(Map<String, long[]> rangeMap, String key, long startTime, long endTime) {
- if (!rangeMap.containsKey(key)) {
- rangeMap.put(key, new long[]{startTime, endTime});
- return true;
- }
-
- long[] range = rangeMap.get(key);
- if (range[1] >= startTime) {
- return false;
- }
- range[1] = endTime;
- return true;
- }
-
- private boolean isTimeSequential(List<Long> timeList) {
- if (timeList == null || timeList.size() <= 1) {
- return true;
- }
- long preTime = timeList.get(0);
- for (int i = 1; i < timeList.size(); i++) {
- long time = timeList.get(i);
- if (preTime >= time) {
- return false;
- }
- preTime = time;
- }
- return true;
- }
-
- private long readLastPositionFromRestoreFile(String path) throws IOException {
- int tsfilePositionByteSize = RestorableTsFileIOWriter.getTsPositionByteSize();
- byte[] lastPostionBytes = new byte[tsfilePositionByteSize];
- RandomAccessFile randomAccessFile = null;
- randomAccessFile = new RandomAccessFile(path, "r");
-
- long fileLength = randomAccessFile.length();
- randomAccessFile.seek(fileLength - tsfilePositionByteSize);
- randomAccessFile.read(lastPostionBytes);
- long lastPosition = BytesUtils.bytesToLong(lastPostionBytes);
- randomAccessFile.close();
- return lastPosition;
- }
-
- public static List<File> getFileList(String dirPath) {
- List<File> res = new ArrayList<>();
- File dir = new File(dirPath);
- if (dir.isFile()) {
- res.add(dir);
- return res;
- }
-
- File[] files = dir.listFiles();
- if (files != null) {
- for (int i = 0; i < files.length; i++) {
- if (files[i].isDirectory()) {
- res.addAll(getFileList(files[i].getAbsolutePath()));
- } else {
- res.add(files[i]);
- }
- }
-
- }
- return res;
- }
-
- public static List<File> getLastDirList(String dirPath) {
- List<File> res = new ArrayList<>();
- File dir = new File(dirPath);
- if (dir.isFile()) {
- return Collections.emptyList();
- }
-
- File[] files = dir.listFiles();
- if (files != null) {
- for (int i = 0; i < files.length; i++) {
- if (files[i].isDirectory()) {
- res.addAll(getLastDirList(files[i].getAbsolutePath()));
- } else {
- res.add(dir);
- return res;
- }
- }
-
- }
- return res;
- }
-
- @Override
- public TsFileStatus call() throws Exception {
- TsFileStatus status = checkTsFile(path);
- LOGGER.info("TsFile {}: {}", path, status);
- return status;
- }
-
- enum TsFileStatus {
- INTACT,
- INTACT_TIME_UNSEQ,
- DAMAGED,
- UNCLOSED_INTACT,
- UNCLOSED_INTACT_TIME_UNSEQ,
- UNCLOSED_DAMAGED,
- RESTORE
- }
-
- public static void main(String[] args) throws Exception {
- if (args == null || args.length < 1) {
- System.out.println("Please input root dir!");
- System.exit(1);
- }
-
- String root = args[0];
-
- List<File> fileList = TsFileChecker.getFileList(root);
- System.out.println("Num of file: " + fileList.size());
- List<Pair<String, Future<TsFileStatus>>> futureList = new ArrayList<>();
- ExecutorService executorService = Executors.newFixedThreadPool(fileList.size());
- for (File file : fileList) {
- String path = file.getAbsolutePath();
- TsFileChecker checker = new TsFileChecker(path);
- futureList.add(new Pair<>(path, executorService.submit(checker)));
- }
-
- Map<TsFileStatus, List<String>> statusMap = new HashMap<>();
- for (TsFileStatus status : TsFileStatus.values()) {
- statusMap.put(status, new ArrayList<>());
- }
- for (Pair<String, Future<TsFileStatus>> pair : futureList) {
- String path = pair.left;
- TsFileStatus status = pair.right.get();
- statusMap.get(status).add(path);
- }
- executorService.shutdown();
-
- System.out.println("TsFile status:");
- for (Entry<TsFileStatus, List<String>> entry : statusMap.entrySet()) {
- System.out.println(entry.getKey() + ": " + entry.getValue().size());
- }
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveArrayList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveArrayList.java
deleted file mode 100755
index c488d69..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveArrayList.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.utils;
-
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.iotdb.db.monitor.collector.MemTableWriteTimeCost;
-import org.apache.iotdb.db.monitor.collector.MemTableWriteTimeCost.MemTableWriteTimeCostType;
-
-public class PrimitiveArrayList {
-
- private static final int MAX_SIZE_OF_ONE_ARRAY = 512;
- private static final int INITIAL_SIZE = 512;
-
- private Class clazz;
- private List<Object> values;
- private List<long[]> timestamps;
-
- private int length; // Total size of all objects of current ArrayList
- private int currentIndex; // current index of array
- private int currentArrayIndex; // current index of element in current array
- private int currentArraySize; // size of current array
-
- public PrimitiveArrayList(Class clazz) {
- this.clazz = clazz;
- values = new ArrayList<>();
- timestamps = new ArrayList<>();
- values.add(Array.newInstance(clazz, INITIAL_SIZE));
- timestamps.add(new long[INITIAL_SIZE]);
- length = 0;
-
- currentIndex = 0;
- currentArraySize = INITIAL_SIZE;
- currentArrayIndex = -1;
- }
-
- private void capacity(int aimSize) {
- if (currentArraySize < aimSize) {
- if (currentArraySize < MAX_SIZE_OF_ONE_ARRAY) {
- long start = System.currentTimeMillis();
- // expand current Array
- int newCapacity = Math.min(MAX_SIZE_OF_ONE_ARRAY, currentArraySize * 2);
- values.set(currentIndex,
- expandArray(values.get(currentIndex), currentArraySize, newCapacity));
- timestamps.set(currentIndex,
- (long[]) expandArray(timestamps.get(currentIndex), currentArraySize, newCapacity));
- currentArraySize = newCapacity;
- MemTableWriteTimeCost.getInstance().measure(MemTableWriteTimeCostType.CAPACITY_1, start);
- } else {
- long start = System.currentTimeMillis();
- // add a new Array to the list
- values.add(Array.newInstance(clazz, INITIAL_SIZE));
- timestamps.add(new long[INITIAL_SIZE]);
- currentIndex++;
- currentArraySize = INITIAL_SIZE;
- currentArrayIndex = -1;
- MemTableWriteTimeCost.getInstance().measure(MemTableWriteTimeCostType.CAPACITY_2, start);
- }
- }
- }
-
- private Object expandArray(Object array, int preLentgh, int aimLength) {
- long start = System.currentTimeMillis();
- Class arrayClass = array.getClass().getComponentType();
- Object newArray = Array.newInstance(arrayClass, aimLength);
- MemTableWriteTimeCost.getInstance().measure(MemTableWriteTimeCostType.EXPAND_ARRAY_1, start);
- start = System.currentTimeMillis();
- System.arraycopy(array, 0, newArray, 0, preLentgh);
- MemTableWriteTimeCost.getInstance().measure(MemTableWriteTimeCostType.EXPAND_ARRAY_2, start);
- return newArray;
- }
-
- public void putTimestamp(long timestamp, Object value) {
- long start = System.currentTimeMillis();
- capacity(currentArrayIndex + 1 + 1);
- MemTableWriteTimeCost.getInstance().measure(MemTableWriteTimeCostType.PUT_TIMESTAMP_1, start);
- start = System.currentTimeMillis();
- currentArrayIndex++;
- timestamps.get(currentIndex)[currentArrayIndex] = timestamp;
- Array.set(values.get(currentIndex), currentArrayIndex, value);
- length++;
- MemTableWriteTimeCost.getInstance().measure(MemTableWriteTimeCostType.PUT_TIMESTAMP_2, start);
- }
-
- public long getTimestamp(int index) {
- checkIndex(index);
- return timestamps.get(index / MAX_SIZE_OF_ONE_ARRAY)[index % MAX_SIZE_OF_ONE_ARRAY];
- }
-
- public Object getValue(int index) {
- checkIndex(index);
- return Array.get(values.get(index / MAX_SIZE_OF_ONE_ARRAY), index % MAX_SIZE_OF_ONE_ARRAY);
- }
-
- private void checkIndex(int index) {
- if (index < 0) {
- throw new NegativeArraySizeException("negetive array index:" + index);
- }
- if (index >= length) {
- throw new ArrayIndexOutOfBoundsException("index: " + index);
- }
- }
-
- public int size() {
- return length;
- }
-
- @Override
- public PrimitiveArrayList clone() {
- PrimitiveArrayList cloneList = new PrimitiveArrayList(clazz);
- cloneList.values.clear();
- cloneList.timestamps.clear();
- for (Object valueArray : values) {
- cloneList.values.add(cloneArray(valueArray, clazz));
- }
- for (Object timestampArray : timestamps) {
- cloneList.timestamps.add((long[]) cloneArray(timestampArray, long.class));
- }
- cloneList.length = length;
- cloneList.currentIndex = currentIndex;
- cloneList.currentArrayIndex = currentArrayIndex;
- cloneList.currentArraySize = currentArraySize;
- return cloneList;
- }
-
- private Object cloneArray(Object array, Class clazz) {
- Object cloneArray = Array.newInstance(clazz, Array.getLength(array));
- System.arraycopy(array, 0, cloneArray, 0, Array.getLength(array));
- return cloneArray;
- }
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveArrayListFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveArrayListFactory.java
deleted file mode 100644
index b1c949d..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveArrayListFactory.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.utils;
-
-import org.apache.iotdb.db.utils.PrimitiveArrayList;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
-
-public class PrimitiveArrayListFactory {
-
- private PrimitiveArrayListFactory(){}
-
- public static PrimitiveArrayList getByDataType(TSDataType dataType) {
- switch (dataType) {
- case BOOLEAN:
- return new PrimitiveArrayList(boolean.class);
- case INT32:
- return new PrimitiveArrayList(int.class);
- case INT64:
- return new PrimitiveArrayList(long.class);
- case FLOAT:
- return new PrimitiveArrayList(float.class);
- case DOUBLE:
- return new PrimitiveArrayList(double.class);
- case TEXT:
- return new PrimitiveArrayList(Binary.class);
- default:
- throw new UnSupportedDataTypeException("DataType: " + dataType);
- }
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveArrayListV2.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveArrayListV2.java
deleted file mode 100644
index 2863db4..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveArrayListV2.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.utils;
-
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.List;
-
-public class PrimitiveArrayListV2 {
-
- private static final int MAX_SIZE_OF_ONE_ARRAY = 512;
- private static final int INITIAL_SIZE = 512;
-
- private Class clazz;
- private List<Object> values;
- private List<long[]> timestamps;
-
- private int totalDataNumber; // Total data number of all objects of current ArrayList
- private int currentArrayIndex; // current index of array
- private int offsetInCurrentArray; // current index of element in current array
- private int currentArraySize; // number of current arrays
-
- public PrimitiveArrayListV2(Class clazz) {
- this.clazz = clazz;
- values = new ArrayList<>();
- timestamps = new ArrayList<>();
- values.add(Array.newInstance(clazz, INITIAL_SIZE));
- timestamps.add(new long[INITIAL_SIZE]);
- totalDataNumber = 0;
-
- currentArrayIndex = 0;
- currentArraySize = INITIAL_SIZE;
- offsetInCurrentArray = -1;
- }
-
- private void checkCapacity(int aimSize) {
- if (currentArraySize < aimSize) {
- if (currentArraySize < MAX_SIZE_OF_ONE_ARRAY) {
- int newCapacity = Math.min(MAX_SIZE_OF_ONE_ARRAY, currentArraySize * 2);
- values.set(currentArrayIndex,
- expandArray(values.get(currentArrayIndex), currentArraySize, newCapacity));
- timestamps.set(currentArrayIndex,
- (long[]) expandArray(timestamps.get(currentArrayIndex), currentArraySize, newCapacity));
- currentArraySize = newCapacity;
- } else {
- // add a new Array to the list
- values.add(Array.newInstance(clazz, INITIAL_SIZE));
- timestamps.add(new long[INITIAL_SIZE]);
- currentArrayIndex++;
- currentArraySize = timestamps.get(currentArrayIndex).length;
- offsetInCurrentArray = -1;
- }
- }
- }
-
- private Object expandArray(Object array, int preLentgh, int aimLength) {
- Class arrayClass = array.getClass().getComponentType();
- Object newArray = Array.newInstance(arrayClass, aimLength);
- System.arraycopy(array, 0, newArray, 0, preLentgh);
- return newArray;
- }
-
- public void putTimestamp(long timestamp, Object value) {
- checkCapacity(offsetInCurrentArray + 1 + 1);
- offsetInCurrentArray++;
- timestamps.get(currentArrayIndex)[offsetInCurrentArray] = timestamp;
- Array.set(values.get(currentArrayIndex), offsetInCurrentArray, value);
- totalDataNumber++;
- }
-
- public long getTimestamp(int index) {
- checkIndex(index);
- return timestamps.get(index / MAX_SIZE_OF_ONE_ARRAY)[index % MAX_SIZE_OF_ONE_ARRAY];
- }
-
- public Object getValue(int index) {
- checkIndex(index);
- return Array.get(values.get(index / MAX_SIZE_OF_ONE_ARRAY), index % MAX_SIZE_OF_ONE_ARRAY);
- }
-
- private void checkIndex(int index) {
- if (index < 0) {
- throw new NegativeArraySizeException("negetive array index:" + index);
- }
- if (index >= totalDataNumber) {
- throw new ArrayIndexOutOfBoundsException("index: " + index);
- }
- }
-
- public int getTotalDataNumber() {
- return totalDataNumber;
- }
-
- @Override
- public PrimitiveArrayListV2 clone() {
- PrimitiveArrayListV2 cloneList = new PrimitiveArrayListV2(clazz);
- cloneList.values.clear();
- cloneList.timestamps.clear();
- for (Object valueArray : values) {
- cloneList.values.add(cloneArray(valueArray, clazz));
- }
- for (Object timestampArray : timestamps) {
- cloneList.timestamps.add((long[]) cloneArray(timestampArray, long.class));
- }
- cloneList.totalDataNumber = totalDataNumber;
- cloneList.currentArrayIndex = currentArrayIndex;
- cloneList.offsetInCurrentArray = offsetInCurrentArray;
- cloneList.currentArraySize = currentArraySize;
- return cloneList;
- }
-
- private Object cloneArray(Object array, Class clazz) {
- Object cloneArray = Array.newInstance(clazz, Array.getLength(array));
- System.arraycopy(array, 0, cloneArray, 0, Array.getLength(array));
- return cloneArray;
- }
-
- public Class getClazz() {
- return clazz;
- }
-
- public void reset(){
- totalDataNumber = 0;
- currentArrayIndex = 0;
- offsetInCurrentArray = -1;
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveDataListPool.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveDataListPool.java
deleted file mode 100644
index 6df1cf3..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveDataListPool.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.utils;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manage all primitive data list in memory, including get and release operation.
- */
-public class PrimitiveDataListPool {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(PrimitiveDataListPool.class);
-
- private static final Map<Class, ConcurrentLinkedQueue<PrimitiveArrayListV2>> primitiveDataListsMap = new ConcurrentHashMap<>();
-
- static {
- primitiveDataListsMap.put(boolean.class, new ConcurrentLinkedQueue<>());
- primitiveDataListsMap.put(int.class, new ConcurrentLinkedQueue<>());
- primitiveDataListsMap.put(long.class, new ConcurrentLinkedQueue<>());
- primitiveDataListsMap.put(float.class, new ConcurrentLinkedQueue<>());
- primitiveDataListsMap.put(double.class, new ConcurrentLinkedQueue<>());
- primitiveDataListsMap.put(Binary.class, new ConcurrentLinkedQueue<>());
- }
-
- private PrimitiveDataListPool() {
- }
-
- public PrimitiveArrayListV2 getPrimitiveDataListByDataType(TSDataType dataType) {
- switch (dataType) {
- case BOOLEAN:
- return getPrimitiveDataList(boolean.class);
- case INT32:
- return getPrimitiveDataList(int.class);
- case INT64:
- return getPrimitiveDataList(long.class);
- case FLOAT:
- return getPrimitiveDataList(float.class);
- case DOUBLE:
- return getPrimitiveDataList(double.class);
- case TEXT:
- return getPrimitiveDataList(Binary.class);
- default:
- throw new UnSupportedDataTypeException("DataType: " + dataType);
- }
- }
-
- private PrimitiveArrayListV2 getPrimitiveDataList(Class clazz) {
- ConcurrentLinkedQueue<PrimitiveArrayListV2> primitiveArrayList = primitiveDataListsMap.get(clazz);
- PrimitiveArrayListV2 dataList = primitiveArrayList.poll();
- return dataList == null ? new PrimitiveArrayListV2(clazz) : dataList;
- }
-
- public void release(PrimitiveArrayListV2 primitiveArrayList) {
- primitiveArrayList.reset();
- primitiveDataListsMap.get(primitiveArrayList.getClazz()).offer(primitiveArrayList);
- }
-
- public static PrimitiveDataListPool getInstance() {
- return PrimitiveDataListPool.InstanceHolder.INSTANCE;
- }
-
- private static class InstanceHolder {
-
- private InstanceHolder() {
- }
-
- private static final PrimitiveDataListPool INSTANCE = new PrimitiveDataListPool();
- }
-
- public int getPrimitiveDataListSizeByDataType(TSDataType dataType){
- switch (dataType) {
- case BOOLEAN:
- return primitiveDataListsMap.get(boolean.class).size();
- case INT32:
- return primitiveDataListsMap.get(int.class).size();
- case INT64:
- return primitiveDataListsMap.get(long.class).size();
- case FLOAT:
- return primitiveDataListsMap.get(float.class).size();
- case DOUBLE:
- return primitiveDataListsMap.get(double.class).size();
- case TEXT:
- return primitiveDataListsMap.get(Binary.class).size();
- default:
- throw new UnSupportedDataTypeException("DataType: " + dataType);
- }
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index b881c13..cb63650 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -18,19 +18,23 @@
*/
package org.apache.iotdb.db.utils.datastructure;
+import static org.apache.iotdb.db.rescon.PrimitiveDataListPool.ARRAY_SIZE;
+
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.db.rescon.PrimitiveDataListPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
public class BinaryTVList extends TVList {
private List<Binary[]> values;
- private Binary[] sortedValues;
+ private Binary[][] sortedValues;
private Binary pivotValue;
- public BinaryTVList() {
+ BinaryTVList() {
super();
values = new ArrayList<>();
}
@@ -38,8 +42,8 @@ public class BinaryTVList extends TVList {
@Override
public void putBinary(long timestamp, Binary value) {
checkExpansion();
- int arrayIndex = size / SINGLE_ARRAY_SIZE;
- int elementIndex = size % SINGLE_ARRAY_SIZE;
+ int arrayIndex = size / ARRAY_SIZE;
+ int elementIndex = size % ARRAY_SIZE;
minTime = minTime <= timestamp ? minTime : timestamp;
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
@@ -52,11 +56,11 @@ public class BinaryTVList extends TVList {
throw new ArrayIndexOutOfBoundsException(index);
}
if (!sorted) {
- int arrayIndex = index / SINGLE_ARRAY_SIZE;
- int elementIndex = index % SINGLE_ARRAY_SIZE;
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
return values.get(arrayIndex)[elementIndex];
} else {
- return sortedValues[index];
+ return sortedValues[index/ARRAY_SIZE][index%ARRAY_SIZE];
}
}
@@ -64,8 +68,8 @@ public class BinaryTVList extends TVList {
if (index >= size) {
throw new ArrayIndexOutOfBoundsException(index);
}
- int arrayIndex = index / SINGLE_ARRAY_SIZE;
- int elementIndex = index % SINGLE_ARRAY_SIZE;
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
}
@@ -79,8 +83,10 @@ public class BinaryTVList extends TVList {
cloneList.values.add(cloneValue(valueArray));
}
} else {
- cloneList.sortedValues = new Binary[size];
- System.arraycopy(sortedValues, 0, cloneList.sortedValues, 0, size);
+ cloneList.sortedValues = new Binary[sortedValues.length][ARRAY_SIZE];
+ for (int i = 0; i < sortedValues.length; i++) {
+ System.arraycopy(sortedValues[i], 0, cloneList.sortedValues[i], 0, ARRAY_SIZE);
+ }
}
return cloneList;
}
@@ -93,18 +99,40 @@ public class BinaryTVList extends TVList {
public void sort() {
if (sortedTimestamps == null || sortedTimestamps.length < size) {
- sortedTimestamps = new long[size];
+ sortedTimestamps = (long[][]) PrimitiveDataListPool.getInstance().getDataListsByType(TSDataType.INT64, size);
}
if (sortedValues == null || sortedValues.length < size) {
- sortedValues = new Binary[size];
+ sortedValues = (Binary[][]) PrimitiveDataListPool.getInstance().getDataListsByType(TSDataType.TEXT, size);
}
sort(0, size);
+ clearTime();
+ clearValue();
sorted = true;
}
@Override
+ void clearValue() {
+ if (values != null) {
+ for (Binary[] dataArray : values) {
+ PrimitiveDataListPool.getInstance().release(dataArray);
+ }
+ values.clear();
+ }
+ }
+
+ @Override
+ void clearSortedValue() {
+ if (sortedValues != null) {
+ for (Binary[] dataArray : sortedValues) {
+ PrimitiveDataListPool.getInstance().release(dataArray);
+ }
+ sortedValues = null;
+ }
+ }
+
+ @Override
protected void setFromSorted(int src, int dest) {
- set(dest, sortedTimestamps[src], sortedValues[src]);
+ set(dest, sortedTimestamps[src/ARRAY_SIZE][src%ARRAY_SIZE], sortedValues[src/ARRAY_SIZE][src%ARRAY_SIZE]);
}
protected void set(int src, int dest) {
@@ -114,8 +142,8 @@ public class BinaryTVList extends TVList {
}
protected void setToSorted(int src, int dest) {
- sortedTimestamps[dest] = getTime(src);
- sortedValues[dest] = getBinary(src);
+ sortedTimestamps[dest/ARRAY_SIZE][dest% ARRAY_SIZE] = getTime(src);
+ sortedValues[dest/ARRAY_SIZE][dest%ARRAY_SIZE] = getBinary(src);
}
protected void reverseRange(int lo, int hi) {
@@ -132,7 +160,8 @@ public class BinaryTVList extends TVList {
@Override
protected void expandValues() {
- values.add(new Binary[SINGLE_ARRAY_SIZE]);
+ values.add((Binary[]) PrimitiveDataListPool
+ .getInstance().getPrimitiveDataListByType(TSDataType.TEXT));
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
index c882750..43ce3ce 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
@@ -18,18 +18,22 @@
*/
package org.apache.iotdb.db.utils.datastructure;
+import static org.apache.iotdb.db.rescon.PrimitiveDataListPool.ARRAY_SIZE;
+
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.db.rescon.PrimitiveDataListPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class BooleanTVList extends TVList {
private List<boolean[]> values;
- private boolean[] sortedValues;
+ private boolean[][] sortedValues;
private boolean pivotValue;
- public BooleanTVList() {
+ BooleanTVList() {
super();
values = new ArrayList<>();
}
@@ -37,8 +41,8 @@ public class BooleanTVList extends TVList {
@Override
public void putBoolean(long timestamp, boolean value) {
checkExpansion();
- int arrayIndex = size / SINGLE_ARRAY_SIZE;
- int elementIndex = size % SINGLE_ARRAY_SIZE;
+ int arrayIndex = size / ARRAY_SIZE;
+ int elementIndex = size % ARRAY_SIZE;
minTime = minTime <= timestamp ? minTime : timestamp;
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
@@ -51,11 +55,11 @@ public class BooleanTVList extends TVList {
throw new ArrayIndexOutOfBoundsException(index);
}
if (!sorted) {
- int arrayIndex = index / SINGLE_ARRAY_SIZE;
- int elementIndex = index % SINGLE_ARRAY_SIZE;
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
return values.get(arrayIndex)[elementIndex];
} else {
- return sortedValues[index];
+ return sortedValues[index/ARRAY_SIZE][index%ARRAY_SIZE];
}
}
@@ -63,8 +67,8 @@ public class BooleanTVList extends TVList {
if (index >= size) {
throw new ArrayIndexOutOfBoundsException(index);
}
- int arrayIndex = index / SINGLE_ARRAY_SIZE;
- int elementIndex = index % SINGLE_ARRAY_SIZE;
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
}
@@ -78,8 +82,10 @@ public class BooleanTVList extends TVList {
cloneList.values.add(cloneValue(valueArray));
}
} else {
- cloneList.sortedValues = new boolean[size];
- System.arraycopy(sortedValues, 0, cloneList.sortedValues, 0, size);
+ cloneList.sortedValues = new boolean[sortedValues.length][ARRAY_SIZE];
+ for (int i = 0; i < sortedValues.length; i++) {
+ System.arraycopy(sortedValues[i], 0, cloneList.sortedValues[i], 0, ARRAY_SIZE);
+ }
}
return cloneList;
}
@@ -92,18 +98,40 @@ public class BooleanTVList extends TVList {
public void sort() {
if (sortedTimestamps == null || sortedTimestamps.length < size) {
- sortedTimestamps = new long[size];
+ sortedTimestamps = (long[][]) PrimitiveDataListPool.getInstance().getDataListsByType(TSDataType.INT64, size);
}
if (sortedValues == null || sortedValues.length < size) {
- sortedValues = new boolean[size];
+ sortedValues = (boolean[][]) PrimitiveDataListPool.getInstance().getDataListsByType(TSDataType.BOOLEAN, size);
}
sort(0, size);
+ clearTime();
+ clearValue();
sorted = true;
}
@Override
+ void clearValue() {
+ if (values != null) {
+ for (boolean[] dataArray : values) {
+ PrimitiveDataListPool.getInstance().release(dataArray);
+ }
+ values.clear();
+ }
+ }
+
+ @Override
+ void clearSortedValue() {
+ if (sortedValues != null) {
+ for (boolean[] dataArray : sortedValues) {
+ PrimitiveDataListPool.getInstance().release(dataArray);
+ }
+ sortedValues = null;
+ }
+ }
+
+ @Override
protected void setFromSorted(int src, int dest) {
- set(dest, sortedTimestamps[src], sortedValues[src]);
+ set(dest, sortedTimestamps[src/ARRAY_SIZE][src%ARRAY_SIZE], sortedValues[src/ARRAY_SIZE][src%ARRAY_SIZE]);
}
protected void set(int src, int dest) {
@@ -113,8 +141,8 @@ public class BooleanTVList extends TVList {
}
protected void setToSorted(int src, int dest) {
- sortedTimestamps[dest] = getTime(src);
- sortedValues[dest] = getBoolean(src);
+ sortedTimestamps[dest/ARRAY_SIZE][dest% ARRAY_SIZE] = getTime(src);
+ sortedValues[dest/ARRAY_SIZE][dest%ARRAY_SIZE] = getBoolean(src);
}
protected void reverseRange(int lo, int hi) {
@@ -131,7 +159,8 @@ public class BooleanTVList extends TVList {
@Override
protected void expandValues() {
- values.add(new boolean[SINGLE_ARRAY_SIZE]);
+ values.add((boolean[]) PrimitiveDataListPool
+ .getInstance().getPrimitiveDataListByType(TSDataType.BOOLEAN));
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
index f108e67..af712e9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
@@ -18,18 +18,22 @@
*/
package org.apache.iotdb.db.utils.datastructure;
+import static org.apache.iotdb.db.rescon.PrimitiveDataListPool.ARRAY_SIZE;
+
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.db.rescon.PrimitiveDataListPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class DoubleTVList extends TVList {
private List<double[]> values;
- private double[] sortedValues;
+ private double[][] sortedValues;
private double pivotValue;
- public DoubleTVList() {
+ DoubleTVList() {
super();
values = new ArrayList<>();
}
@@ -37,8 +41,8 @@ public class DoubleTVList extends TVList {
@Override
public void putDouble(long timestamp, double value) {
checkExpansion();
- int arrayIndex = size / SINGLE_ARRAY_SIZE;
- int elementIndex = size % SINGLE_ARRAY_SIZE;
+ int arrayIndex = size / ARRAY_SIZE;
+ int elementIndex = size % ARRAY_SIZE;
minTime = minTime <= timestamp ? minTime : timestamp;
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
@@ -51,11 +55,11 @@ public class DoubleTVList extends TVList {
throw new ArrayIndexOutOfBoundsException(index);
}
if (!sorted) {
- int arrayIndex = index / SINGLE_ARRAY_SIZE;
- int elementIndex = index % SINGLE_ARRAY_SIZE;
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
return values.get(arrayIndex)[elementIndex];
} else {
- return sortedValues[index];
+ return sortedValues[index/ARRAY_SIZE][index%ARRAY_SIZE];
}
}
@@ -63,8 +67,8 @@ public class DoubleTVList extends TVList {
if (index >= size) {
throw new ArrayIndexOutOfBoundsException(index);
}
- int arrayIndex = index / SINGLE_ARRAY_SIZE;
- int elementIndex = index % SINGLE_ARRAY_SIZE;
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
}
@@ -78,8 +82,10 @@ public class DoubleTVList extends TVList {
cloneList.values.add(cloneValue(valueArray));
}
} else {
- cloneList.sortedValues = new double[size];
- System.arraycopy(sortedValues, 0, cloneList.sortedValues, 0, size);
+ cloneList.sortedValues = new double[sortedValues.length][ARRAY_SIZE];
+ for (int i = 0; i < sortedValues.length; i++) {
+ System.arraycopy(sortedValues[i], 0, cloneList.sortedValues[i], 0, ARRAY_SIZE);
+ }
}
return cloneList;
}
@@ -92,18 +98,40 @@ public class DoubleTVList extends TVList {
public void sort() {
if (sortedTimestamps == null || sortedTimestamps.length < size) {
- sortedTimestamps = new long[size];
+ sortedTimestamps = (long[][]) PrimitiveDataListPool.getInstance().getDataListsByType(TSDataType.INT64, size);
}
if (sortedValues == null || sortedValues.length < size) {
- sortedValues = new double[size];
+ sortedValues = (double[][]) PrimitiveDataListPool.getInstance().getDataListsByType(TSDataType.DOUBLE, size);
}
sort(0, size);
+ clearTime();
+ clearValue();
sorted = true;
}
@Override
+ void clearValue() {
+ if (values != null) {
+ for (double[] dataArray : values) {
+ PrimitiveDataListPool.getInstance().release(dataArray);
+ }
+ values.clear();
+ }
+ }
+
+ @Override
+ void clearSortedValue() {
+ if (sortedValues != null) {
+ for (double[] dataArray : sortedValues) {
+ PrimitiveDataListPool.getInstance().release(dataArray);
+ }
+ sortedValues = null;
+ }
+ }
+
+ @Override
protected void setFromSorted(int src, int dest) {
- set(dest, sortedTimestamps[src], sortedValues[src]);
+ set(dest, sortedTimestamps[src/ARRAY_SIZE][src%ARRAY_SIZE], sortedValues[src/ARRAY_SIZE][src%ARRAY_SIZE]);
}
protected void set(int src, int dest) {
@@ -113,8 +141,8 @@ public class DoubleTVList extends TVList {
}
protected void setToSorted(int src, int dest) {
- sortedTimestamps[dest] = getTime(src);
- sortedValues[dest] = getDouble(src);
+ sortedTimestamps[dest/ARRAY_SIZE][dest% ARRAY_SIZE] = getTime(src);
+ sortedValues[dest/ARRAY_SIZE][dest%ARRAY_SIZE] = getDouble(src);
}
protected void reverseRange(int lo, int hi) {
@@ -131,7 +159,8 @@ public class DoubleTVList extends TVList {
@Override
protected void expandValues() {
- values.add(new double[SINGLE_ARRAY_SIZE]);
+ values.add((double[]) PrimitiveDataListPool
+ .getInstance().getPrimitiveDataListByType(TSDataType.DOUBLE));
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
index 0585bec..bfa1390 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
@@ -18,18 +18,22 @@
*/
package org.apache.iotdb.db.utils.datastructure;
+import static org.apache.iotdb.db.rescon.PrimitiveDataListPool.ARRAY_SIZE;
+
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.db.rescon.PrimitiveDataListPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class FloatTVList extends TVList {
private List<float[]> values;
- private float[] sortedValues;
+ private float[][] sortedValues;
private float pivotValue;
- public FloatTVList() {
+ FloatTVList() {
super();
values = new ArrayList<>();
}
@@ -37,8 +41,8 @@ public class FloatTVList extends TVList {
@Override
public void putFloat(long timestamp, float value) {
checkExpansion();
- int arrayIndex = size / SINGLE_ARRAY_SIZE;
- int elementIndex = size % SINGLE_ARRAY_SIZE;
+ int arrayIndex = size / ARRAY_SIZE;
+ int elementIndex = size % ARRAY_SIZE;
minTime = minTime <= timestamp ? minTime : timestamp;
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
@@ -51,11 +55,11 @@ public class FloatTVList extends TVList {
throw new ArrayIndexOutOfBoundsException(index);
}
if (!sorted) {
- int arrayIndex = index / SINGLE_ARRAY_SIZE;
- int elementIndex = index % SINGLE_ARRAY_SIZE;
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
return values.get(arrayIndex)[elementIndex];
} else {
- return sortedValues[index];
+ return sortedValues[index/ARRAY_SIZE][index%ARRAY_SIZE];
}
}
@@ -63,8 +67,8 @@ public class FloatTVList extends TVList {
if (index >= size) {
throw new ArrayIndexOutOfBoundsException(index);
}
- int arrayIndex = index / SINGLE_ARRAY_SIZE;
- int elementIndex = index % SINGLE_ARRAY_SIZE;
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
}
@@ -78,8 +82,10 @@ public class FloatTVList extends TVList {
cloneList.values.add(cloneValue(valueArray));
}
} else {
- cloneList.sortedValues = new float[size];
- System.arraycopy(sortedValues, 0, cloneList.sortedValues, 0, size);
+ cloneList.sortedValues = new float[sortedValues.length][ARRAY_SIZE];
+ for (int i = 0; i < sortedValues.length; i++) {
+ System.arraycopy(sortedValues[i], 0, cloneList.sortedValues[i], 0, ARRAY_SIZE);
+ }
}
return cloneList;
}
@@ -92,18 +98,40 @@ public class FloatTVList extends TVList {
public void sort() {
if (sortedTimestamps == null || sortedTimestamps.length < size) {
- sortedTimestamps = new long[size];
+ sortedTimestamps = (long[][]) PrimitiveDataListPool.getInstance().getDataListsByType(TSDataType.INT64, size);
}
if (sortedValues == null || sortedValues.length < size) {
- sortedValues = new float[size];
+ sortedValues = (float[][]) PrimitiveDataListPool.getInstance().getDataListsByType(TSDataType.FLOAT, size);
}
sort(0, size);
+ clearTime();
+ clearValue();
sorted = true;
}
@Override
+ void clearValue() {
+ if (values != null) {
+ for (float[] dataArray : values) {
+ PrimitiveDataListPool.getInstance().release(dataArray);
+ }
+ values.clear();
+ }
+ }
+
+ @Override
+ void clearSortedValue() {
+ if (sortedValues != null) {
+ for (float[] dataArray : sortedValues) {
+ PrimitiveDataListPool.getInstance().release(dataArray);
+ }
+ sortedValues = null;
+ }
+ }
+
+ @Override
protected void setFromSorted(int src, int dest) {
- set(dest, sortedTimestamps[src], sortedValues[src]);
+ set(dest, sortedTimestamps[src/ARRAY_SIZE][src%ARRAY_SIZE], sortedValues[src/ARRAY_SIZE][src%ARRAY_SIZE]);
}
protected void set(int src, int dest) {
@@ -113,8 +141,8 @@ public class FloatTVList extends TVList {
}
protected void setToSorted(int src, int dest) {
- sortedTimestamps[dest] = getTime(src);
- sortedValues[dest] = getFloat(src);
+ sortedTimestamps[dest/ARRAY_SIZE][dest% ARRAY_SIZE] = getTime(src);
+ sortedValues[dest/ARRAY_SIZE][dest%ARRAY_SIZE] = getFloat(src);
}
protected void reverseRange(int lo, int hi) {
@@ -131,7 +159,8 @@ public class FloatTVList extends TVList {
@Override
protected void expandValues() {
- values.add(new float[SINGLE_ARRAY_SIZE]);
+ values.add((float[]) PrimitiveDataListPool
+ .getInstance().getPrimitiveDataListByType(TSDataType.FLOAT));
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index 4da7914..54ab5dd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -18,18 +18,22 @@
*/
package org.apache.iotdb.db.utils.datastructure;
+import static org.apache.iotdb.db.rescon.PrimitiveDataListPool.ARRAY_SIZE;
+
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.db.rescon.PrimitiveDataListPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class IntTVList extends TVList {
private List<int[]> values;
- private int[] sortedValues;
+ private int[][] sortedValues;
private int pivotValue;
- public IntTVList() {
+ IntTVList() {
super();
values = new ArrayList<>();
}
@@ -37,8 +41,8 @@ public class IntTVList extends TVList {
@Override
public void putInt(long timestamp, int value) {
checkExpansion();
- int arrayIndex = size / SINGLE_ARRAY_SIZE;
- int elementIndex = size % SINGLE_ARRAY_SIZE;
+ int arrayIndex = size / ARRAY_SIZE;
+ int elementIndex = size % ARRAY_SIZE;
minTime = minTime <= timestamp ? minTime : timestamp;
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
@@ -51,11 +55,11 @@ public class IntTVList extends TVList {
throw new ArrayIndexOutOfBoundsException(index);
}
if (!sorted) {
- int arrayIndex = index / SINGLE_ARRAY_SIZE;
- int elementIndex = index % SINGLE_ARRAY_SIZE;
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
return values.get(arrayIndex)[elementIndex];
} else {
- return sortedValues[index];
+ return sortedValues[index/ARRAY_SIZE][index%ARRAY_SIZE];
}
}
@@ -63,8 +67,8 @@ public class IntTVList extends TVList {
if (index >= size) {
throw new ArrayIndexOutOfBoundsException(index);
}
- int arrayIndex = index / SINGLE_ARRAY_SIZE;
- int elementIndex = index % SINGLE_ARRAY_SIZE;
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
}
@@ -78,8 +82,10 @@ public class IntTVList extends TVList {
cloneList.values.add(cloneValue(valueArray));
}
} else {
- cloneList.sortedValues = new int[size];
- System.arraycopy(sortedValues, 0, cloneList.sortedValues, 0, size);
+ cloneList.sortedValues = new int[sortedValues.length][ARRAY_SIZE];
+ for (int i = 0; i < sortedValues.length; i++) {
+ System.arraycopy(sortedValues[i], 0, cloneList.sortedValues[i], 0, ARRAY_SIZE);
+ }
}
return cloneList;
}
@@ -92,18 +98,40 @@ public class IntTVList extends TVList {
public void sort() {
if (sortedTimestamps == null || sortedTimestamps.length < size) {
- sortedTimestamps = new long[size];
+ sortedTimestamps = (long[][]) PrimitiveDataListPool.getInstance().getDataListsByType(TSDataType.INT64, size);
}
if (sortedValues == null || sortedValues.length < size) {
- sortedValues = new int[size];
+ sortedValues = (int[][]) PrimitiveDataListPool.getInstance().getDataListsByType(TSDataType.INT32, size);
}
sort(0, size);
+ clearTime();
+ clearValue();
sorted = true;
}
@Override
+ void clearValue() {
+ if (values != null) {
+ for (int[] dataArray : values) {
+ PrimitiveDataListPool.getInstance().release(dataArray);
+ }
+ values.clear();
+ }
+ }
+
+ @Override
+ void clearSortedValue() {
+ if (sortedValues != null) {
+ for (int[] dataArray : sortedValues) {
+ PrimitiveDataListPool.getInstance().release(dataArray);
+ }
+ sortedValues = null;
+ }
+ }
+
+ @Override
protected void setFromSorted(int src, int dest) {
- set(dest, sortedTimestamps[src], sortedValues[src]);
+ set(dest, sortedTimestamps[src/ARRAY_SIZE][src%ARRAY_SIZE], sortedValues[src/ARRAY_SIZE][src%ARRAY_SIZE]);
}
protected void set(int src, int dest) {
@@ -113,8 +141,8 @@ public class IntTVList extends TVList {
}
protected void setToSorted(int src, int dest) {
- sortedTimestamps[dest] = getTime(src);
- sortedValues[dest] = getInt(src);
+ sortedTimestamps[dest/ARRAY_SIZE][dest% ARRAY_SIZE] = getTime(src);
+ sortedValues[dest/ARRAY_SIZE][dest%ARRAY_SIZE] = getInt(src);
}
protected void reverseRange(int lo, int hi) {
@@ -131,7 +159,8 @@ public class IntTVList extends TVList {
@Override
protected void expandValues() {
- values.add(new int[SINGLE_ARRAY_SIZE]);
+ values.add((int[]) PrimitiveDataListPool
+ .getInstance().getPrimitiveDataListByType(TSDataType.INT32));
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
index f9a8704..0edd86b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
@@ -18,18 +18,22 @@
*/
package org.apache.iotdb.db.utils.datastructure;
+import static org.apache.iotdb.db.rescon.PrimitiveDataListPool.ARRAY_SIZE;
+
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.db.rescon.PrimitiveDataListPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class LongTVList extends TVList {
private List<long[]> values;
- private long[] sortedValues;
+ private long[][] sortedValues;
private long pivotValue;
- public LongTVList() {
+ LongTVList() {
super();
values = new ArrayList<>();
}
@@ -37,8 +41,8 @@ public class LongTVList extends TVList {
@Override
public void putLong(long timestamp, long value) {
checkExpansion();
- int arrayIndex = size / SINGLE_ARRAY_SIZE;
- int elementIndex = size % SINGLE_ARRAY_SIZE;
+ int arrayIndex = size / ARRAY_SIZE;
+ int elementIndex = size % ARRAY_SIZE;
minTime = minTime <= timestamp ? minTime : timestamp;
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
@@ -51,11 +55,11 @@ public class LongTVList extends TVList {
throw new ArrayIndexOutOfBoundsException(index);
}
if (!sorted) {
- int arrayIndex = index / SINGLE_ARRAY_SIZE;
- int elementIndex = index % SINGLE_ARRAY_SIZE;
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
return values.get(arrayIndex)[elementIndex];
} else {
- return sortedValues[index];
+ return sortedValues[index/ARRAY_SIZE][index%ARRAY_SIZE];
}
}
@@ -63,8 +67,8 @@ public class LongTVList extends TVList {
if (index >= size) {
throw new ArrayIndexOutOfBoundsException(index);
}
- int arrayIndex = index / SINGLE_ARRAY_SIZE;
- int elementIndex = index % SINGLE_ARRAY_SIZE;
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
}
@@ -78,8 +82,10 @@ public class LongTVList extends TVList {
cloneList.values.add(cloneValue(valueArray));
}
} else {
- cloneList.sortedValues = new long[size];
- System.arraycopy(sortedValues, 0, cloneList.sortedValues, 0, size);
+ cloneList.sortedValues = new long[sortedValues.length][ARRAY_SIZE];
+ for (int i = 0; i < sortedValues.length; i++) {
+ System.arraycopy(sortedValues[i], 0, cloneList.sortedValues[i], 0, ARRAY_SIZE);
+ }
}
return cloneList;
}
@@ -92,18 +98,40 @@ public class LongTVList extends TVList {
public void sort() {
if (sortedTimestamps == null || sortedTimestamps.length < size) {
- sortedTimestamps = new long[size];
+ sortedTimestamps = (long[][]) PrimitiveDataListPool.getInstance().getDataListsByType(TSDataType.INT64, size);
}
if (sortedValues == null || sortedValues.length < size) {
- sortedValues = new long[size];
+ sortedValues = (long[][]) PrimitiveDataListPool.getInstance().getDataListsByType(TSDataType.INT64, size);
}
sort(0, size);
+ clearTime();
+ clearValue();
sorted = true;
}
@Override
+ void clearValue() {
+ if (values != null) {
+ for (long[] dataArray : values) {
+ PrimitiveDataListPool.getInstance().release(dataArray);
+ }
+ values.clear();
+ }
+ }
+
+ @Override
+ void clearSortedValue() {
+ if (sortedValues != null) {
+ for (long[] dataArray : sortedValues) {
+ PrimitiveDataListPool.getInstance().release(dataArray);
+ }
+ sortedValues = null;
+ }
+ }
+
+ @Override
protected void setFromSorted(int src, int dest) {
- set(dest, sortedTimestamps[src], sortedValues[src]);
+ set(dest, sortedTimestamps[src/ARRAY_SIZE][src%ARRAY_SIZE], sortedValues[src/ARRAY_SIZE][src%ARRAY_SIZE]);
}
protected void set(int src, int dest) {
@@ -113,8 +141,8 @@ public class LongTVList extends TVList {
}
protected void setToSorted(int src, int dest) {
- sortedTimestamps[dest] = getTime(src);
- sortedValues[dest] = getLong(src);
+ sortedTimestamps[dest/ARRAY_SIZE][dest% ARRAY_SIZE] = getTime(src);
+ sortedValues[dest/ARRAY_SIZE][dest%ARRAY_SIZE] = getLong(src);
}
protected void reverseRange(int lo, int hi) {
@@ -131,7 +159,8 @@ public class LongTVList extends TVList {
@Override
protected void expandValues() {
- values.add(new long[SINGLE_ARRAY_SIZE]);
+ values.add((long[]) PrimitiveDataListPool
+ .getInstance().getPrimitiveDataListByType(TSDataType.INT64));
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 62f9702..f9462cf 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -19,8 +19,11 @@
package org.apache.iotdb.db.utils.datastructure;
+import static org.apache.iotdb.db.rescon.PrimitiveDataListPool.ARRAY_SIZE;
+
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.db.rescon.PrimitiveDataListPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -30,13 +33,11 @@ public abstract class TVList {
private static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent";
protected static final int SMALL_ARRAY_LENGTH = 32;
- protected static final int SINGLE_ARRAY_SIZE = 512;
protected List<long[]> timestamps;
protected int size;
- protected int limit;
- protected long[] sortedTimestamps;
+ protected long[][] sortedTimestamps;
protected boolean sorted = false;
private long timeOffset = Long.MIN_VALUE;
@@ -48,7 +49,6 @@ public abstract class TVList {
public TVList() {
timestamps = new ArrayList<>();
size = 0;
- limit = 0;
minTime = Long.MIN_VALUE;
}
@@ -61,11 +61,11 @@ public abstract class TVList {
throw new ArrayIndexOutOfBoundsException(index);
}
if (!sorted) {
- int arrayIndex = index / SINGLE_ARRAY_SIZE;
- int elementIndex = index % SINGLE_ARRAY_SIZE;
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
return timestamps.get(arrayIndex)[elementIndex];
} else {
- return sortedTimestamps[index];
+ return sortedTimestamps[index/ARRAY_SIZE][index%ARRAY_SIZE];
}
}
@@ -141,27 +141,54 @@ public abstract class TVList {
cloneList.timestamps.add(cloneTime(timestampArray));
}
} else {
- cloneList.sortedTimestamps = new long[size];
- System.arraycopy(sortedTimestamps, 0, cloneList.sortedTimestamps, 0, size);
+ cloneList.sortedTimestamps = new long[sortedTimestamps.length][ARRAY_SIZE];
+ for (int i = 0; i < sortedTimestamps.length; i++) {
+ System.arraycopy(sortedTimestamps[i], 0, cloneList.sortedTimestamps[i], 0, ARRAY_SIZE);
+ }
}
cloneList.size = size;
cloneList.sorted = sorted;
- cloneList.limit = limit;
cloneList.minTime = minTime;
}
- public void reset() {
+ public void clear() {
size = 0;
- timeOffset = -1;
+ timeOffset = Long.MIN_VALUE;
sorted = false;
minTime = Long.MIN_VALUE;
+ clearTime();
+ clearSortedTime();
+
+ clearValue();
+ clearSortedValue();
+ }
+
+ protected void clearTime() {
+ if (timestamps != null) {
+ for (long[] dataArray : timestamps) {
+ PrimitiveDataListPool.getInstance().release(dataArray);
+ }
+ timestamps.clear();
+ }
}
+ protected void clearSortedTime() {
+ if (sortedTimestamps != null) {
+ for (long[] dataArray : sortedTimestamps) {
+ PrimitiveDataListPool.getInstance().release(dataArray);
+ }
+ sortedTimestamps = null;
+ }
+ }
+
+ abstract void clearValue();
+
+ abstract void clearSortedValue();
+
protected void checkExpansion() {
- if ((size == limit) && (size % SINGLE_ARRAY_SIZE) == 0) {
+ if ((size % ARRAY_SIZE) == 0) {
expandValues();
- timestamps.add(new long[SINGLE_ARRAY_SIZE]);
- limit += SINGLE_ARRAY_SIZE;
+ timestamps.add((long[]) PrimitiveDataListPool.getInstance().getPrimitiveDataListByType(TSDataType.INT64));
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVListAllocator.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVListAllocator.java
index ce2896c..b1aeea5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVListAllocator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVListAllocator.java
@@ -43,12 +43,12 @@ public class TVListAllocator {
}
public synchronized void release(TSDataType dataType, TVList list) {
- list.reset();
+ list.clear();
tvListCache.get(dataType).add(list);
}
public synchronized void release(TVList list) {
- list.reset();
+ list.clear();
if (list instanceof BinaryTVList) {
tvListCache.get(TSDataType.TEXT).add(list);
} else if (list instanceof BooleanTVList) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java
deleted file mode 100644
index 68b61cd..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.memcontrol;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.Consumer;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
-import org.apache.iotdb.db.engine.MetadataManagerHelper;
-import org.apache.iotdb.db.engine.PathUtils;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
-import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.utils.FileSchemaUtils;
-import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class BufferwriteFileSizeControlTest {
-
- Action bfflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
-
- }
- };
-
- Action bfcloseaction = new Action() {
-
- @Override
- public void act() throws ActionException {
- }
- };
- Consumer<BufferWriteProcessor> bfcloseConsumer = bfProcessor -> {};
-
- Action fnflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
-
- }
- };
-
- BufferWriteProcessor processor = null;
- String nsp = "root.vehicle.d0";
- String nsp2 = "root.vehicle.d1";
-
- private boolean cachePageData = false;
- private int groupSizeInByte;
- private int pageCheckSizeThreshold;
- private int pageSizeInByte;
- private int maxStringLength;
- private long fileSizeThreshold;
- private long memMonitorInterval;
- private TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
- private IoTDBConfig dbConfig = IoTDBDescriptor.getInstance().getConfig();
-
- private boolean skip = !false;
-
- @Before
- public void setUp() throws Exception {
- // origin value
- groupSizeInByte = TsFileConf.groupSizeInByte;
- pageCheckSizeThreshold = TsFileConf.pageCheckSizeThreshold;
- pageSizeInByte = TsFileConf.pageSizeInByte;
- maxStringLength = TsFileConf.maxStringLength;
- fileSizeThreshold = dbConfig.getBufferwriteFileSizeThreshold();
- memMonitorInterval = dbConfig.getMemMonitorInterval();
- // new value
- TsFileConf.groupSizeInByte = 200000;
- TsFileConf.pageCheckSizeThreshold = 3;
- TsFileConf.pageSizeInByte = 10000;
- TsFileConf.maxStringLength = 2;
- dbConfig.setBufferwriteFileSizeThreshold(5 * 1024 * 1024);
- BasicMemController.getInstance().setCheckInterval(600 * 1000);
- // init metadata
- MetadataManagerHelper.initMetadata();
- }
-
- @After
- public void tearDown() throws Exception {
- // recovery value
- TsFileConf.groupSizeInByte = groupSizeInByte;
- TsFileConf.pageCheckSizeThreshold = pageCheckSizeThreshold;
- TsFileConf.pageSizeInByte = pageSizeInByte;
- TsFileConf.maxStringLength = maxStringLength;
- dbConfig.setBufferwriteFileSizeThreshold(fileSizeThreshold);
- BasicMemController.getInstance().setCheckInterval(memMonitorInterval);
- // clean environment
- EnvironmentUtils.cleanEnv();
- }
-
- @Test
- public void test() throws BufferWriteProcessorException, WriteProcessException {
- if (skip) {
- return;
- }
- String filename = "bufferwritetest";
- new File(filename).delete();
-
- Map<String, Action> parameters = new HashMap<>();
- parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bfflushaction);
- //parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bfcloseaction);
- parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
-
- try {
- processor = new BufferWriteProcessor(DirectoryManager.getInstance().getTsFolderForTest(), nsp,
- filename,
- parameters, bfcloseConsumer, SysTimeVersionController.INSTANCE,
- FileSchemaUtils.constructFileSchema(nsp));
- } catch (BufferWriteProcessorException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- File nspdir = PathUtils.getBufferWriteDir(nsp);
- assertEquals(true, nspdir.isDirectory());
- for (int i = 0; i < 1000000; i++) {
- processor.write(nsp, "s1", i * i, TSDataType.INT64, i + "");
- processor.write(nsp2, "s1", i * i, TSDataType.INT64, i + "");
- if (i % 100000 == 0) {
- System.out.println(i + "," + MemUtils.bytesCntToStr(processor.getFileSize()));
- }
- }
- // wait to flush end
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- processor.close();
- assertTrue(processor.getFileSize() < dbConfig.getBufferwriteFileSizeThreshold());
- fail("Method unimplemented");
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java
deleted file mode 100644
index 3e6dac1..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.memcontrol;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
-import org.apache.iotdb.db.engine.MetadataManagerHelper;
-import org.apache.iotdb.db.engine.PathUtils;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
-import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.utils.FileSchemaUtils;
-import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class BufferwriteMetaSizeControlTest {
-
- Action bfflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
-
- }
- };
-
- Action bfcloseaction = new Action() {
-
- @Override
- public void act() throws ActionException {
- }
- };
-
-
- Action fnflushaction = new Action() {
-
- @Override
- public void act() throws ActionException {
-
- }
- };
-
- BufferWriteProcessor processor = null;
- String nsp = "root.vehicle.d0";
- String nsp2 = "root.vehicle.d1";
-
- private boolean cachePageData = false;
- private int groupSizeInByte;
- private int pageCheckSizeThreshold;
- private int pageSizeInByte;
- private int maxStringLength;
- private long metaSizeThreshold;
- private long memMonitorInterval;
- private TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
- private IoTDBConfig dbConfig = IoTDBDescriptor.getInstance().getConfig();
-
- private boolean skip = !false;
-
- @Before
- public void setUp() throws Exception {
- // origin value
- groupSizeInByte = TsFileConf.groupSizeInByte;
- pageCheckSizeThreshold = TsFileConf.pageCheckSizeThreshold;
- pageSizeInByte = TsFileConf.pageSizeInByte;
- maxStringLength = TsFileConf.maxStringLength;
- metaSizeThreshold = dbConfig.getBufferwriteFileSizeThreshold();
- memMonitorInterval = dbConfig.getMemMonitorInterval();
- // new value
- TsFileConf.groupSizeInByte = 200000;
- TsFileConf.pageCheckSizeThreshold = 3;
- TsFileConf.pageSizeInByte = 10000;
- TsFileConf.maxStringLength = 2;
- dbConfig.setBufferwriteMetaSizeThreshold(1024 * 1024);
- BasicMemController.getInstance().setCheckInterval(600 * 1000);
- // init metadata
- MetadataManagerHelper.initMetadata();
- }
-
- @After
- public void tearDown() throws Exception {
- // recovery value
- TsFileConf.groupSizeInByte = groupSizeInByte;
- TsFileConf.pageCheckSizeThreshold = pageCheckSizeThreshold;
- TsFileConf.pageSizeInByte = pageSizeInByte;
- TsFileConf.maxStringLength = maxStringLength;
- dbConfig.setBufferwriteMetaSizeThreshold(metaSizeThreshold);
- BasicMemController.getInstance().setCheckInterval(memMonitorInterval);
- // clean environment
- EnvironmentUtils.cleanEnv();
- }
-
- @Test
- public void test() throws BufferWriteProcessorException, WriteProcessException {
- if (skip) {
- return;
- }
- String filename = "bufferwritetest";
- new File(filename).delete();
-
- Map<String, Action> parameters = new HashMap<>();
- parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bfflushaction);
- parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fnflushaction);
-
- try {
- processor = new BufferWriteProcessor(DirectoryManager.getInstance().getTsFolderForTest(), nsp,
- filename,
- parameters, x->{}, SysTimeVersionController.INSTANCE, FileSchemaUtils.constructFileSchema(nsp));
- } catch (BufferWriteProcessorException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- File nspdir = PathUtils.getBufferWriteDir(nsp);
- assertEquals(true, nspdir.isDirectory());
- for (int i = 0; i < 1000000; i++) {
- processor.write(nsp, "s1", i * i, TSDataType.INT64, i + "");
- processor.write(nsp2, "s1", i * i, TSDataType.INT64, i + "");
- if (i % 100000 == 0) {
- System.out.println(i + "," + MemUtils.bytesCntToStr(processor.getMetaSize()));
- }
- }
- // wait to flush end
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- assertTrue(processor.getMetaSize() < dbConfig.getBufferwriteFileSizeThreshold());
- processor.close();
- fail("Method unimplemented");
-
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index 28596b0..18c71b6 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -25,6 +25,7 @@ import java.util.Random;
import org.apache.iotdb.db.utils.MathUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TsPrimitiveType;
+import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -46,7 +47,7 @@ public class PrimitiveMemTableTest {
@Test
public void memSeriesCloneTest() {
TSDataType dataType = TSDataType.INT32;
- WritableMemChunk series = new WritableMemChunk(dataType);
+ WritableMemChunkV2 series = new WritableMemChunkV2(dataType, TVList.newList(dataType));
int count = 1000;
for (int i = 0; i < count; i++) {
series.write(i, String.valueOf(i));
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java
index e193f0a..863a0cc 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
public class IoTDBDeletionIT {
@@ -117,6 +118,7 @@ public class IoTDBDeletionIT {
cleanData();
}
+ @Ignore
@Test
public void testMerge() throws SQLException, InterruptedException {
prepareMerge();
@@ -136,7 +138,6 @@ public class IoTDBDeletionIT {
assertEquals(5000, cnt);
set.close();
- Thread.sleep(5000);
// after merge completes
set = statement.executeQuery("SELECT * FROM root.vehicle.d0");
cnt = 0;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 8a56b42..b7d3f9c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -88,8 +88,8 @@ public class EnvironmentUtils {
MManager.getInstance().flushObjectToFile();
// delete all directory
cleanAllDir();
- // FileNodeManagerV2.getInstance().reset();
- // reset MemController
+ // FileNodeManagerV2.getInstance().clear();
+ // clear MemController
BasicMemController.getInstance().close();
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/PrimitiveArrayListV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/PrimitiveArrayListV2Test.java
deleted file mode 100644
index 7795af5..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/PrimitiveArrayListV2Test.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.utils;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class PrimitiveArrayListV2Test {
-
- public static void printMemUsed() {
- Runtime.getRuntime().gc();
- long size = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
- int gb = (int) (size / 1024 / 1024 / 1024);
- int mb = (int) (size / 1024 / 1024 - gb * 1024);
- int kb = (int) (size / 1024 - gb * 1024 * 1024 - mb * 1024);
- int b = (int) (size - gb * 1024 * 1024 * 1024 - mb * 1024 * 1024 - kb * 1024);
- System.out.println("Mem Used:" + gb + "GB, " + mb + "MB, " + kb + "KB, " + b + "B");
- }
-
- @Test
- public void testPutAndGet() {
-
- long timestamp = System.currentTimeMillis();
- int count = 10000;
- PrimitiveArrayListV2 primitiveArrayList = new PrimitiveArrayListV2(int.class);
- for (int i = 0; i < count; i++) {
- primitiveArrayList.putTimestamp(i, i);
- }
-
- for (int i = 0; i < count; i++) {
- int v = (int) primitiveArrayList.getValue(i);
- Assert.assertEquals((long) i, primitiveArrayList.getTimestamp(i));
- Assert.assertEquals(i, v);
- }
- printMemUsed();
- System.out.println("time consume: " + (System.currentTimeMillis() - timestamp) + "ms" );
- }
-
- @Test
- public void testReset1() {
- int count = 10000;
- PrimitiveArrayListV2 primitiveArrayList = new PrimitiveArrayListV2(int.class);
- for (int i = 0; i < count; i++) {
- primitiveArrayList.putTimestamp(i, i);
- }
- primitiveArrayList.reset();
-
- count = 1000;
- for (int i = 0; i < count; i++) {
- primitiveArrayList.putTimestamp(i, i + 10000);
- }
-
- assert count == primitiveArrayList.getTotalDataNumber();
- for (int i = 0; i < count; i++) {
- int v = (int) primitiveArrayList.getValue(i);
- Assert.assertEquals((long) i, primitiveArrayList.getTimestamp(i));
- Assert.assertEquals(i + 10000, v);
- }
- printMemUsed();
- }
-
- @Test
- public void testReset2() {
- int count = 1000;
- PrimitiveArrayListV2 primitiveArrayList = new PrimitiveArrayListV2(int.class);
- for (int i = 0; i < count; i++) {
- primitiveArrayList.putTimestamp(i, i);
- }
- primitiveArrayList.reset();
-
- count = 100000;
- for (int i = 0; i < count; i++) {
- primitiveArrayList.putTimestamp(i, i + 1000);
- }
-
- assert count == primitiveArrayList.getTotalDataNumber();
- for (int i = 0; i < count; i++) {
- int v = (int) primitiveArrayList.getValue(i);
- Assert.assertEquals((long) i, primitiveArrayList.getTimestamp(i));
- Assert.assertEquals(i + 1000, v);
- }
- printMemUsed();
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/PrimitiveDataListPoolTest.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/PrimitiveDataListPoolTest.java
deleted file mode 100644
index 4996cd1..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/PrimitiveDataListPoolTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.utils;
-
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class PrimitiveDataListPoolTest {
-
- @Test
- public void getPrimitiveDataListByDataType() {
- }
-
- @Test
- public void release() {
- int loop = 100;
- while(loop-- > 0){
- PrimitiveArrayListV2 primitiveArrayList = PrimitiveDataListPool.getInstance().getPrimitiveDataListByDataType(
- TSDataType.INT32);
- testPutAndGet(primitiveArrayList, loop * 100);
- PrimitiveDataListPool.getInstance().release(primitiveArrayList);
- assert PrimitiveDataListPool.getInstance().getPrimitiveDataListSizeByDataType(TSDataType.INT32) == 1;
- }
- }
-
- public void testPutAndGet(PrimitiveArrayListV2 primitiveArrayList, int count) {
- for (int i = 0; i < count; i++) {
- primitiveArrayList.putTimestamp(i, i);
- }
-
- assert count == primitiveArrayList.getTotalDataNumber();
-
- for (int i = 0; i < count; i++) {
- int v = (int) primitiveArrayList.getValue(i);
- Assert.assertEquals((long) i, primitiveArrayList.getTimestamp(i));
- Assert.assertEquals(i, v);
- }
- }
-}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/datastructure/LongTVListTest.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/datastructure/LongTVListTest.java
index 3d5fe26..89be610 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/datastructure/LongTVListTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/datastructure/LongTVListTest.java
@@ -21,11 +21,8 @@ package org.apache.iotdb.db.utils.datastructure;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-import org.apache.iotdb.db.engine.memtable.DeduplicatedSortedData;
-import org.apache.iotdb.db.engine.memtable.WritableMemChunk;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TsPrimitiveType.TsLong;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.Assert;
import org.junit.Test;
@@ -99,40 +96,4 @@ public class LongTVListTest {
}
-
- @Test
- public void compareGetSortedTimeValuePairTime() {
- long time1 = System.currentTimeMillis();
- for (int j = 0; j < 100; j++) {
- WritableMemChunk writableMemChunk = new WritableMemChunk(TSDataType.INT64);
- for (long i = 0; i < 1000; i++) {
- writableMemChunk.putLong(i, i);
- }
- List<TimeValuePair> timeValuePairs = writableMemChunk.getSortedTimeValuePairList();
- for (int i = 0; i < timeValuePairs.size(); i++) {
- timeValuePairs.get(i);
- }
- }
- time1 = System.currentTimeMillis() - time1;
- System.out.println("writable memchunk getSortedTimeValuePairList time: " + time1);
-
- }
-
- @Test
- public void compareGetDeduplicatedDataTime() {
- long time2 = System.currentTimeMillis();
- for (int j = 0; j < 100; j++) {
- WritableMemChunk writableMemChunk = new WritableMemChunk(TSDataType.INT64);
- for (long i = 0; i < 1000; i++) {
- writableMemChunk.putLong(i, i);
- }
- DeduplicatedSortedData deduplicatedSortedData = writableMemChunk.getDeduplicatedSortedData();
- while (deduplicatedSortedData.hasNext()) {
- deduplicatedSortedData.next();
- }
- }
- time2 = System.currentTimeMillis() - time2;
- System.out.println("writable memchunk getDeduplicatedSortedData time: " + time2);
- }
-
}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index 6a0be73..9d662e4 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -27,7 +27,6 @@ import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
@@ -40,7 +39,6 @@ import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;