You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/05/14 04:08:01 UTC
[incubator-iotdb] branch refactor_overflow updated: partially
complete TsFileProcessor
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch refactor_overflow
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/refactor_overflow by this push:
new 817f141 partially complete TsFileProcessor
817f141 is described below
commit 817f14157dd607c9f2239062ffb8e5aa8f4ab2de
Author: 江天 <jt...@163.com>
AuthorDate: Tue May 14 12:06:27 2019 +0800
partially complete TsFileProcessor
---
.../java/org/apache/iotdb/db/engine/Processor.java | 3 +-
.../engine/bufferwrite/BufferWriteProcessor.java | 7 +-
.../db/engine/filenode/FileNodeProcessor.java | 32 +--
.../db/engine/overflow/io/OverflowProcessor.java | 10 +-
.../db/engine/overflowdata/OverflowProcessor.java | 28 +--
.../db/engine/querycontext/QueryDataSource.java | 6 +-
...SeriesDataSource.java => SeriesDataSource.java} | 4 +-
.../iotdb/db/engine/sgmanager/OperationResult.java | 4 +-
.../db/engine/sgmanager/StorageGroupProcessor.java | 105 ++++----
.../db/engine/tsfiledata/TsFileProcessor.java | 270 ++++++++++++---------
.../iotdb/db/exception/TooManyChunksException.java | 19 ++
.../TsFileProcessorException.java} | 27 ++-
.../db/query/control/QueryResourceManager.java | 4 +-
.../query/reader/sequence/SequenceDataReader.java | 6 +-
.../sequence/SequenceDataReaderByTimestamp.java | 4 +-
.../writelog/manager/MultiFileLogNodeManager.java | 7 +-
.../db/writelog/manager/WriteLogNodeManager.java | 3 +-
.../db/writelog/node/ExclusiveWriteLogNode.java | 6 +-
.../recover/ExclusiveLogRecoverPerformer.java | 38 +--
19 files changed, 304 insertions(+), 279 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
index 5c515ff..223523a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
@@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
import org.apache.iotdb.db.engine.filenode.FileNodeProcessor;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
/**
* Processor is used for implementing different processor with different operation.<br>
@@ -185,7 +186,7 @@ public abstract class Processor {
* @throws IOException
* @throws ProcessorException
*/
- public abstract void close() throws ProcessorException;
+ public abstract void close() throws TsFileProcessorException;
public abstract long memoryUsage();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 1809cfb..b99d0a2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.engine.pool.FlushManager;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.utils.ImmediateFuture;
import org.apache.iotdb.db.utils.MemUtils;
@@ -399,7 +400,7 @@ public class BufferWriteProcessor extends Processor {
}
@Override
- public void close() throws BufferWriteProcessorException {
+ public void close() throws TsFileProcessorException {
if (isClosed) {
return;
}
@@ -431,11 +432,11 @@ public class BufferWriteProcessor extends Processor {
} catch (IOException e) {
LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.",
getProcessorName(), e);
- throw new BufferWriteProcessorException(e);
+ throw new TsFileProcessorException(e);
} catch (Exception e) {
LOGGER
.error("Failed to close the bufferwrite processor when calling the action function.", e);
- throw new BufferWriteProcessorException(e);
+ throw new TsFileProcessorException(e);
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 209c1f1..acaea24 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -58,7 +58,7 @@ import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
import org.apache.iotdb.db.engine.pool.MergeManager;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -72,6 +72,7 @@ import org.apache.iotdb.db.exception.FileNodeProcessorException;
import org.apache.iotdb.db.exception.OverflowProcessorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.IStatistic;
import org.apache.iotdb.db.monitor.MonitorConstants;
@@ -143,7 +144,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
*/
private long lastMergeTime = -1;
private BufferWriteProcessor bufferWriteProcessor = null;
- private OverflowProcessor overflowProcessor = null;
+ private OverflowProcessor overflowProcessor1 = null;
private Set<Integer> oldMultiPassTokenSet = null;
private Set<Integer> newMultiPassTokenSet = new HashSet<>();
@@ -484,12 +485,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
// restore the overflow processor
LOGGER.info("The filenode processor {} will recovery the overflow processor.",
getProcessorName());
- parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
- parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
try {
- overflowProcessor = new OverflowProcessor(getProcessorName(), parameters, fileSchema,
- versionController);
- } catch (IOException e) {
+ overflowProcessor1 = new OverflowProcessor(getProcessorName(), overflowFlushAction,
+ flushFileNodeProcessorAction, ()->{}, versionController, fileSchema);
+ } catch (TsFileProcessorException | IOException e) {
LOGGER.error("The filenode processor {} failed to recovery the overflow processor.",
getProcessorName());
throw new FileNodeProcessorException(e);
@@ -554,19 +553,20 @@ public class FileNodeProcessor extends Processor implements IStatistic {
/**
* get overflow processor by processor name.
*/
- public OverflowProcessor getOverflowProcessor(String processorName) throws IOException {
- if (overflowProcessor == null) {
+ public OverflowProcessor getOverflowProcessor(String processorName)
+ throws IOException, TsFileProcessorException {
+ if (overflowProcessor1 == null) {
Map<String, Action> params = new HashMap<>();
// construct processor or restore
params.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
params
.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
- overflowProcessor = new OverflowProcessor(processorName, params, fileSchema,
- versionController);
- } else if (overflowProcessor.isClosed()) {
- overflowProcessor.reopen();
+ overflowProcessor1 = new OverflowProcessor(getProcessorName(), overflowFlushAction,
+ flushFileNodeProcessorAction, ()->{}, versionController, fileSchema);
+ } else if (overflowProcessor1.isClosed()) {
+ overflowProcessor1.reopen();
}
- return overflowProcessor;
+ return overflowProcessor1;
}
/**
@@ -833,10 +833,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
unsealedTsFile.setTimeSeriesChunkMetaDatas(bufferwritedata.right);
}
- GlobalSortedSeriesDataSource globalSortedSeriesDataSource = new GlobalSortedSeriesDataSource(
+ SeriesDataSource seriesDataSource = new SeriesDataSource(
new Path(deviceId + "." + measurementId), bufferwriteDataInFiles, unsealedTsFile,
bufferwritedata.left);
- return new QueryDataSource(globalSortedSeriesDataSource, overflowSeriesDataSource);
+ return new QueryDataSource(seriesDataSource, overflowSeriesDataSource);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 9d04990..f052876 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -50,6 +50,7 @@ import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.OverflowProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
@@ -122,8 +123,7 @@ public class OverflowProcessor extends Processor {
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
logNode = MultiFileLogNodeManager.getInstance().getNode(
processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX,
- getOverflowRestoreFile(),
- FileNodeManager.getInstance().getRestoreFilePath(processorName));
+ getOverflowRestoreFile());
}
}
@@ -587,7 +587,7 @@ public class OverflowProcessor extends Processor {
}
@Override
- public void close() throws OverflowProcessorException {
+ public void close() throws TsFileProcessorException {
if (isClosed) {
return;
}
@@ -602,7 +602,7 @@ public class OverflowProcessor extends Processor {
getProcessorName(), e);
Thread.currentThread().interrupt();
} catch (IOException e) {
- throw new OverflowProcessorException(e);
+ throw new TsFileProcessorException(e);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("The overflow processor {} ends close operation.", getProcessorName());
@@ -618,7 +618,7 @@ public class OverflowProcessor extends Processor {
try {
clear();
} catch (IOException e) {
- throw new OverflowProcessorException(e);
+ throw new TsFileProcessorException(e);
}
isClosed = true;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
index 22f28dd..eae9799 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
@@ -20,25 +20,12 @@
package org.apache.iotdb.db.engine.overflowdata;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
-import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
-import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.sgmanager.OperationResult;
import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.tsfile.write.schema.FileSchema;
public class OverflowProcessor extends TsFileProcessor {
@@ -48,16 +35,12 @@ public class OverflowProcessor extends TsFileProcessor {
*
* @param processorName processor name
* @param fileSchemaRef file schema
- * @throws BufferWriteProcessorException BufferWriteProcessorException
+ * @throws TsFileProcessorException TsFileProcessorException
*/
- public OverflowProcessor(String processorName,
- Action beforeFlushAction,
- Action afterFlushAction,
- Action afterCloseAction,
- VersionController versionController,
+ public OverflowProcessor(String processorName, VersionController versionController,
FileSchema fileSchemaRef)
- throws BufferWriteProcessorException, IOException {
- super(processorName, beforeFlushAction, afterFlushAction, afterCloseAction, versionController,
+ throws TsFileProcessorException, IOException {
+ super(processorName, versionController,
fileSchemaRef);
}
@@ -80,5 +63,4 @@ public class OverflowProcessor extends TsFileProcessor {
protected String getLogSuffix() {
return IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX;
}
-
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
index f63915e..84dac5f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
@@ -21,18 +21,18 @@ package org.apache.iotdb.db.engine.querycontext;
public class QueryDataSource {
// sequence data source
- private GlobalSortedSeriesDataSource seriesDataSource;
+ private SeriesDataSource seriesDataSource;
// unSequence data source
private OverflowSeriesDataSource overflowSeriesDataSource;
- public QueryDataSource(GlobalSortedSeriesDataSource seriesDataSource,
+ public QueryDataSource(SeriesDataSource seriesDataSource,
OverflowSeriesDataSource overflowSeriesDataSource) {
this.seriesDataSource = seriesDataSource;
this.overflowSeriesDataSource = overflowSeriesDataSource;
}
- public GlobalSortedSeriesDataSource getSeqDataSource() {
+ public SeriesDataSource getSeqDataSource() {
return seriesDataSource;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/SeriesDataSource.java
similarity index 94%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSource.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/SeriesDataSource.java
index 176a30e..933c0bc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/SeriesDataSource.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.tsfile.read.common.Path;
-public class GlobalSortedSeriesDataSource {
+public class SeriesDataSource {
private Path seriesPath;
@@ -35,7 +35,7 @@ public class GlobalSortedSeriesDataSource {
// seq mem-table
private ReadOnlyMemChunk readableChunk;
- public GlobalSortedSeriesDataSource(Path seriesPath, List<TsFileResource> sealedTsFiles,
+ public SeriesDataSource(Path seriesPath, List<TsFileResource> sealedTsFiles,
UnsealedTsFile unsealedTsFile,
ReadOnlyMemChunk readableChunk) {
this.seriesPath = seriesPath;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/OperationResult.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/OperationResult.java
index 27ce0ff..1a2deb0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/OperationResult.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/OperationResult.java
@@ -27,5 +27,7 @@ public enum OperationResult {
//the write operation is reject because of the timestamp is not allowed
WRITE_REJECT_BY_TIME,
//the write operation is reject because there is no available memory
- WRITE_REJECT_BY_MEM;
+ WRITE_REJECT_BY_MEM,
+ //attempts to write a closed FileProcessor
+ WRITE_REJECT_BY_CLOSED_PROCESSOR,
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
index ed2fdca..07674f5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
@@ -20,35 +20,21 @@
package org.apache.iotdb.db.engine.sgmanager;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.Processor;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStore;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
-import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.overflowdata.OverflowProcessor;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
-import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
+import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.exception.FileNodeProcessorException;
-import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.IStatistic;
import org.apache.iotdb.db.monitor.MonitorConstants;
@@ -56,13 +42,8 @@ import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.schema.FileSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -73,23 +54,19 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
private static final Logger LOGGER = LoggerFactory.getLogger(StorageGroupProcessor.class);
- TsFileProcessor tsFileProcessor;
- OverflowProcessor overflowProcessor;
+ private TsFileProcessor tsFileProcessor;
+ private OverflowProcessor overflowProcessor;
//the version controller is shared by tsfile and overflow processor.
private VersionController versionController;
private FileSchema fileSchema;
-
- Action beforeFlushAction = () -> {};
- Action afterFlushAction = () -> {};
- Action afterCloseAction = () -> {};
-
/**
* Construct processor using name space seriesPath
*/
+ @SuppressWarnings("ResultOfMethodCallIgnored")
public StorageGroupProcessor(String processorName)
- throws IOException, BufferWriteProcessorException, WriteProcessException, FileNodeProcessorException {
+ throws IOException, WriteProcessException, TsFileProcessorException {
super(processorName);
this.fileSchema = constructFileSchema(processorName);
@@ -102,12 +79,10 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
getProcessorName(), systemFolder.getAbsolutePath());
}
versionController = new SimpleFileVersionController(systemFolder.getAbsolutePath());
- tsFileProcessor = new TsFileProcessor(processorName, beforeFlushAction, afterFlushAction,
- afterCloseAction, versionController, fileSchema);
- overflowProcessor = new OverflowProcessor(processorName, beforeFlushAction, afterFlushAction,
- afterCloseAction, versionController, fileSchema);
+ tsFileProcessor = new TsFileProcessor(processorName, versionController, fileSchema);
+ overflowProcessor = new OverflowProcessor(processorName, versionController, fileSchema);
- // RegistStatService
+ // RegisterStatService
if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
String statStorageDeltaName =
MonitorConstants.STAT_STORAGE_GROUP_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR
@@ -119,31 +94,49 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
}
}
- public OperationResult insert(InsertPlan insertPlan) throws IOException, BufferWriteProcessorException {
- OperationResult result = tsFileProcessor.insert(insertPlan);
+ public OperationResult insert(InsertPlan insertPlan) throws IOException, TsFileProcessorException {
+ OperationResult result = getTsFileProcessor().insert(insertPlan);
if (result == OperationResult.WRITE_REJECT_BY_TIME) {
- result = overflowProcessor.insert(insertPlan);
+ result = getOverflowProcessor().insert(insertPlan);
}
return result;
}
- public void update(UpdatePlan plan) {
- tsFileProcessor.update(plan);
- overflowProcessor.update(plan);
+ public void update(UpdatePlan plan) throws TsFileProcessorException {
+ getTsFileProcessor().update(plan);
+ getOverflowProcessor().update(plan);
}
- public void delete(String device, String measurementId, long timestamp) throws IOException {
- tsFileProcessor.delete(device, measurementId, timestamp);
- overflowProcessor.delete(device, measurementId, timestamp);
+ public void delete(String device, String measurementId, long timestamp)
+ throws TsFileProcessorException {
+ try {
+ getTsFileProcessor().delete(device, measurementId, timestamp);
+ } catch (IOException e) {
+ throw new TsFileProcessorException(e);
+ }
+ try {
+ getOverflowProcessor().delete(device, measurementId, timestamp);
+ } catch (IOException e) {
+ throw new TsFileProcessorException(e);
+ }
}
/**
* query data.
*/
public QueryDataSource query(SingleSeriesExpression seriesExpression, QueryContext context)
- throws FileNodeManagerException, IOException {
- GlobalSortedSeriesDataSource tsfileData = tsFileProcessor.query(seriesExpression, context);
- GlobalSortedSeriesDataSource overflowData = overflowProcessor.query(seriesExpression, context);
+ throws TsFileProcessorException {
+ try {
+ SeriesDataSource tsfileData = getTsFileProcessor().query(seriesExpression, context);
+ } catch (IOException e) {
+ throw new TsFileProcessorException(e);
+ }
+ try {
+ SeriesDataSource overflowData = getOverflowProcessor().query(seriesExpression, context);
+ } catch (IOException e) {
+ throw new TsFileProcessorException(e);
+ }
+
return null;
}
@@ -163,12 +156,12 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
}
@Override
- public Future<Boolean> flush() throws IOException {
+ public Future<Boolean> flush() {
return null;
}
@Override
- public void close() throws ProcessorException {
+ public void close() {
}
@@ -199,7 +192,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
return null;
}
- private FileSchema constructFileSchema(String processorName) throws WriteProcessException {
+ private FileSchema constructFileSchema(String processorName) {
List<MeasurementSchema> columnSchemaList;
columnSchemaList = MManager.getInstance().getSchemaForFileName(processorName);
@@ -210,4 +203,18 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
return schema;
}
+
+ public TsFileProcessor getTsFileProcessor() throws TsFileProcessorException {
+ if (tsFileProcessor.isClosed()) {
+ tsFileProcessor.reopen();
+ }
+ return tsFileProcessor;
+ }
+
+ public OverflowProcessor getOverflowProcessor() throws TsFileProcessorException {
+ if (overflowProcessor.isClosed()) {
+ overflowProcessor.reopen();
+ }
+ return overflowProcessor;
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
index 85af489..ce2e803 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
@@ -30,7 +30,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -42,10 +41,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.Processor;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
import org.apache.iotdb.db.engine.bufferwrite.RestorableTsFileIOWriter;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
@@ -56,12 +53,12 @@ import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.pool.FlushManager;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
import org.apache.iotdb.db.engine.sgmanager.OperationResult;
import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
@@ -76,7 +73,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.FileSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -118,10 +114,7 @@ public class TsFileProcessor extends Processor {
private IMemTable workMemTable;
private IMemTable flushMemTable;
- private RestorableTsFileIOWriter writer;
- private Action beforeFlushAction;
- private Action afterCloseAction;
- private Action afterFlushAction;
+ protected RestorableTsFileIOWriter writer;
private File insertFile;
private TsFileResource currentResource;
@@ -139,26 +132,59 @@ public class TsFileProcessor extends Processor {
private WriteLogNode logNode;
private VersionController versionController;
+ private boolean isClosed = true;
+
/**
* constructor of BufferWriteProcessor. data will be stored in baseDir/processorName/ folder.
*
* @param processorName processor name
* @param fileSchemaRef file schema
- * @throws BufferWriteProcessorException BufferWriteProcessorException
+ * @throws TsFileProcessorException TsFileProcessorException
*/
@SuppressWarnings({"squid:S2259", "squid:S3776"})
- public TsFileProcessor(String processorName,
- Action beforeFlushAction, Action afterFlushAction, Action afterCloseAction,
- VersionController versionController,
- FileSchema fileSchemaRef) throws BufferWriteProcessorException, IOException {
+ public TsFileProcessor(String processorName, VersionController versionController,
+ FileSchema fileSchemaRef) throws TsFileProcessorException {
super(processorName);
this.fileSchemaRef = fileSchemaRef;
this.processorName = processorName;
- this.beforeFlushAction = beforeFlushAction;
- this.afterCloseAction = afterCloseAction;
- this.afterFlushAction = afterFlushAction;
- workMemTable = new PrimitiveMemTable();
+ reopen();
+
+ this.versionController = versionController;
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+ try {
+ logNode = MultiFileLogNodeManager.getInstance().getNode(
+ processorName + getLogSuffix(),
+ writer.getRestoreFilePath());
+ } catch (IOException e) {
+ throw new TsFileProcessorException(e);
+ }
+ }
+ }
+
+ public void reopen() throws TsFileProcessorException {
+ if (!isClosed) {
+ return;
+ }
+
+ if (workMemTable == null) {
+ workMemTable = new PrimitiveMemTable();
+ } else {
+ workMemTable.clear();
+ }
+
+ boolean noResources = tsFileResources == null;
+ if (noResources) {
+ initResources();
+ } else {
+ initCurrentTsFile(generateNewTsFilePath());
+ }
+
+ isClosed = false;
+ }
+
+ @SuppressWarnings({"ResultOfMethodCallIgnored"})
+ private void initResources() throws TsFileProcessorException {
tsFileResources = new ArrayList<>();
inverseIndexOfResource = new HashMap<>();
lastFlushedTimeForEachDevice = new HashMap<>();
@@ -169,76 +195,88 @@ public class TsFileProcessor extends Processor {
int unclosedFileCount = 0;
for (String folderPath : getAllDataFolders()) {
File dataFolder = new File(folderPath, processorName);
- if (dataFolder.exists()) {
+ if (!dataFolder.exists()) {
// we do not add the unclosed tsfile into tsFileResources.
File[] unclosedFiles = dataFolder
.listFiles(x -> x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX));
- unclosedFileCount += unclosedFiles.length;
+ if (unclosedFiles != null) {
+ unclosedFileCount += unclosedFiles.length;
+ }
if (unclosedFileCount > 1) {
break;
- } else if (unclosedFileCount == 1) {
+ } else if (unclosedFiles != null) {
unclosedFileName = unclosedFiles[0].getName()
.split(RestorableTsFileIOWriter.RESTORE_SUFFIX)[0];
unclosedFile = new File(unclosedFiles[0].getParentFile(), unclosedFileName);
}
- File[] datas = dataFolder
- .listFiles(x -> !x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX)
- && x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR).length == 2);
- Arrays.sort(datas, Comparator.comparingLong(x -> Long
- .parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
- for (File tsfile : datas) {
- //TODO we'd better define a file suffix for TsFile, e.g., .ts
- String[] names = tsfile.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR);
- long time = Long.parseLong(names[0]);
- if (fileNamePrefix < time) {
- fileNamePrefix = time;
- }
- if (unclosedFileCount == 0 || !tsfile.getName().equals(unclosedFileName)) {
- TsFileResource resource = new TsFileResource(tsfile, true);
- tsFileResources.add(resource);
- //maintain the inverse index and fileNamePrefix
- for (String device : resource.getDevices()) {
- inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>()).add(resource);
- lastFlushedTimeForEachDevice
- .merge(device, resource.getEndTime(device), (x, y) -> x > y ? x : y);
- }
- }
- }
+ addResources(dataFolder, unclosedFileName);
+
} else {
//processor folder does not exist
dataFolder.mkdirs();
}
}
if (unclosedFileCount > 1) {
- throw new BufferWriteProcessorException(String
+ throw new TsFileProcessorException(String
.format("TsProcessor %s has more than one unclosed TsFile. please repair it",
processorName));
- } else if (unclosedFileCount == 0) {
+ }
+ if (unclosedFile == null) {
unclosedFile = generateNewTsFilePath();
}
initCurrentTsFile(unclosedFile);
+ }
- this.versionController = versionController;
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+ // add TsFiles in dataFolder to tsFileResources and update device inserted time map
+ private void addResources(File dataFolder, String unclosedFileName)
+ throws TsFileProcessorException {
+ File[] tsFiles = dataFolder
+ .listFiles(x -> !x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX)
+ && x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR).length == 2);
+ if (tsFiles == null || tsFiles.length == 0) {
+ return;
+ }
+ Arrays.sort(tsFiles, Comparator.comparingLong(x -> Long
+ .parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
+
+ for (File tsfile : tsFiles) {
+ addResource(tsfile, unclosedFileName);
+ }
+ }
+
+ // add one TsFiles to tsFileResources and update device inserted time map
+ private void addResource(File tsfile, String unclosedFileName) throws TsFileProcessorException {
+ //TODO we'd better define a file suffix for TsFile, e.g., .ts
+ String[] names = tsfile.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR);
+ long time = Long.parseLong(names[0]);
+ if (fileNamePrefix < time) {
+ fileNamePrefix = time;
+ }
+ if (!tsfile.getName().equals(unclosedFileName)) {
+ TsFileResource resource;
try {
- logNode = MultiFileLogNodeManager.getInstance().getNode(
- processorName + getLogSuffix(),
- writer.getRestoreFilePath(),
- FileNodeManager.getInstance().getRestoreFilePath(processorName));
+ resource = new TsFileResource(tsfile, true);
} catch (IOException e) {
- throw new BufferWriteProcessorException(e);
+ throw new TsFileProcessorException(e);
+ }
+ tsFileResources.add(resource);
+ //maintain the inverse index and fileNamePrefix
+ for (String device : resource.getDevices()) {
+ inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>()).add(resource);
+ lastFlushedTimeForEachDevice
+ .merge(device, resource.getEndTime(device), (x, y) -> x > y ? x : y);
}
}
}
- private File generateNewTsFilePath() throws BufferWriteProcessorException {
+ private File generateNewTsFilePath() throws TsFileProcessorException {
String dataDir = getNextDataFolder();
File dataFolder = new File(dataDir, processorName);
if (!dataFolder.exists()) {
if (!dataFolder.mkdirs()) {
- throw new BufferWriteProcessorException(
+ throw new TsFileProcessorException(
String.format("Can not create TsFileProcess related folder: %s", dataFolder));
}
LOGGER.debug("The bufferwrite processor data dir doesn't exists, create new directory {}.",
@@ -250,13 +288,13 @@ public class TsFileProcessor extends Processor {
}
- private void initCurrentTsFile(File file) throws BufferWriteProcessorException {
+ private void initCurrentTsFile(File file) throws TsFileProcessorException {
this.insertFile = file;
try {
writer = new RestorableTsFileIOWriter(processorName, insertFile.getAbsolutePath());
this.currentResource = new TsFileResource(insertFile, writer);
} catch (IOException e) {
- throw new BufferWriteProcessorException(e);
+ throw new TsFileProcessorException(e);
}
minWrittenTimeForEachDeviceInCurrentFile.clear();
@@ -275,9 +313,12 @@ public class TsFileProcessor extends Processor {
* @param plan data to be written
* @return OperationResult (WRITE_SUCCESS, WRITE_REJECT_BY_TIME, WRITE_IN_WARNING_MEM and
* WRITE_REJECT_BY_MEM)
- * @throws BufferWriteProcessorException if a flushing operation occurs and failed.
+ * @throws TsFileProcessorException if a flushing operation occurs and failed.
*/
- public OperationResult insert(InsertPlan plan) throws BufferWriteProcessorException, IOException {
+ public OperationResult insert(InsertPlan plan) throws TsFileProcessorException, IOException {
+ if (isClosed) {
+ return OperationResult.WRITE_REJECT_BY_CLOSED_PROCESSOR;
+ }
if (!canWrite(plan.getDeviceId(), plan.getTime())) {
return OperationResult.WRITE_REJECT_BY_TIME;
}
@@ -293,11 +334,13 @@ public class TsFileProcessor extends Processor {
memUsage += MemUtils.getPointSize(type, measurement);
}
UsageLevel level = BasicMemController.getInstance().acquireUsage(this, memUsage);
+ OperationResult result;
switch (level) {
case SAFE:
doInsert(plan);
checkMemThreshold4Flush(memUsage);
- return OperationResult.WRITE_SUCCESS;
+ result = OperationResult.WRITE_SUCCESS;
+ break;
case WARNING:
if(LOGGER.isWarnEnabled()) {
LOGGER.warn("Memory usage will exceed warning threshold, current : {}.",
@@ -307,21 +350,25 @@ public class TsFileProcessor extends Processor {
try {
flush();
} catch (IOException e) {
- throw new BufferWriteProcessorException(e);
+ throw new TsFileProcessorException(e);
}
- return OperationResult.WRITE_IN_WARNING_MEM;
+ result = OperationResult.WRITE_IN_WARNING_MEM;
+ break;
case DANGEROUS:
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.",
MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
}
- return OperationResult.WRITE_REJECT_BY_MEM;
+ result = OperationResult.WRITE_REJECT_BY_MEM;
+ break;
default:
- return OperationResult.WRITE_REJECT_BY_MEM;
+ result = OperationResult.WRITE_REJECT_BY_MEM;
}
+ return result;
}
- private void doInsert(InsertPlan plan) {
+ private void doInsert(InsertPlan plan) throws TsFileProcessorException {
+ writeLog(plan);
String deviceId = plan.getDeviceId();
long time = plan.getTime();
TSDataType type;
@@ -342,11 +389,11 @@ public class TsFileProcessor extends Processor {
}
public OperationResult update(UpdatePlan plan) {
- String device = plan.getPath().getDevice();
- String measurement = plan.getPath().getMeasurement();
- List<Pair<Long, Long>> intervals = plan.getIntervals();
+// String device = plan.getPath().getDevice();
+// String measurement = plan.getPath().getMeasurement();
+// List<Pair<Long, Long>> intervals = plan.getIntervals();
//TODO modify workMemtable, flushMemtable, and existing TsFiles
- return OperationResult.WRITE_REJECT_BY_MEM;
+ throw new UnsupportedOperationException("Update unimplemented!");
}
/**
@@ -389,7 +436,7 @@ public class TsFileProcessor extends Processor {
}
- private void checkMemThreshold4Flush(long addedMemory) throws BufferWriteProcessorException {
+ private void checkMemThreshold4Flush(long addedMemory) throws TsFileProcessorException {
long newMem = memSize.addAndGet(addedMemory);
if (newMem > TSFileConfig.groupSizeInByte) {
if (LOGGER.isInfoEnabled()) {
@@ -402,7 +449,7 @@ public class TsFileProcessor extends Processor {
flush();
} catch (IOException e) {
LOGGER.error("Flush bufferwrite error.", e);
- throw new BufferWriteProcessorException(e);
+ throw new TsFileProcessorException(e);
}
}
}
@@ -443,13 +490,6 @@ public class TsFileProcessor extends Processor {
}
fileNamePrefix = System.nanoTime();
- // update the lastUpdatetime, prepare for flush
- try {
- beforeFlushAction.act();
- } catch (Exception e) {
- LOGGER.error("Failed to flush memtable into tsfile when calling the action function.");
- throw new IOException(e);
- }
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
logNode.notifyStartFlush();
}
@@ -486,7 +526,6 @@ public class TsFileProcessor extends Processor {
return true;
}
- afterFlushAction.act();
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
logNode.notifyEndFlush(null);
}
@@ -499,7 +538,7 @@ public class TsFileProcessor extends Processor {
} finally {
try {
switchFlushToWork();
- } catch (BufferWriteProcessorException e) {
+ } catch (TsFileProcessorException e) {
LOGGER.error(e.getMessage());
result = false;
}
@@ -533,7 +572,7 @@ public class TsFileProcessor extends Processor {
}
}
- private void switchFlushToWork() throws BufferWriteProcessorException {
+ private void switchFlushToWork() throws TsFileProcessorException {
flushQueryLock.lock();
try {
//we update the index of currentTsResource.
@@ -562,15 +601,15 @@ public class TsFileProcessor extends Processor {
/**
* this method do not call flush() to flush data in memory to disk.
- * @throws BufferWriteProcessorException
+ * @throws TsFileProcessorException
*/
- private void closeCurrentTsFileAndOpenNewOne() throws BufferWriteProcessorException {
+ private void closeCurrentTsFileAndOpenNewOne() throws TsFileProcessorException {
closeCurrentFile();
initCurrentTsFile(generateNewTsFilePath());
}
//very dangerous, how to make sure this function is thread safe (no other functions are running)
- private void closeCurrentFile() throws BufferWriteProcessorException {
+ private void closeCurrentFile() throws TsFileProcessorException {
try {
long closeStartTime = System.currentTimeMillis();
// end file
@@ -580,10 +619,6 @@ public class TsFileProcessor extends Processor {
Files.delete(Paths.get(insertFile.getAbsolutePath()));
} else {
writer.endFile(fileSchemaRef);
- // update the IntervalFile for interval list
- afterCloseAction.act();
- // flush the changed information for filenode
- afterFlushAction.act();
tsFileResources.add(currentResource);
//maintain the inverse index
@@ -592,6 +627,7 @@ public class TsFileProcessor extends Processor {
.add(currentResource);
}
}
+ writer = null;
// delete the restore for this bufferwrite processor
if (LOGGER.isInfoEnabled()) {
@@ -615,11 +651,11 @@ public class TsFileProcessor extends Processor {
} catch (IOException e) {
LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.",
getProcessorName(), e);
- throw new BufferWriteProcessorException(e);
+ throw new TsFileProcessorException(e);
} catch (Exception e) {
LOGGER
.error("Failed to close the bufferwrite processor when calling the action function.", e);
- throw new BufferWriteProcessorException(e);
+ throw new TsFileProcessorException(e);
}
}
@@ -647,7 +683,7 @@ public class TsFileProcessor extends Processor {
/**
* query data.
*/
- public GlobalSortedSeriesDataSource query(SingleSeriesExpression expression,
+ public SeriesDataSource query(SingleSeriesExpression expression,
QueryContext context) throws IOException {
MeasurementSchema mSchema;
TSDataType dataType;
@@ -659,11 +695,13 @@ public class TsFileProcessor extends Processor {
dataType = mSchema.getType();
// tsfile dataØØ
+ //TODO in the old version, tsfile is deep copied. I do not know why
List<TsFileResource> dataFiles = new ArrayList<>();
- for (TsFileResource tsfile : tsFileResources) {
- //TODO in the old version, tsfile is deep copied. I do not know why
- dataFiles.add(tsfile);
- }
+ tsFileResources.forEach(k -> {
+ if (k.containsDevice(deviceId)) {
+ dataFiles.add(k);
+ }
+ });
// bufferwrite data
//TODO unsealedTsFile class is a little redundant.
UnsealedTsFile unsealedTsFile = null;
@@ -680,9 +718,9 @@ public class TsFileProcessor extends Processor {
unsealedTsFile.setTimeSeriesChunkMetaDatas(chunks);
}
- return new GlobalSortedSeriesDataSource(
- new Path(deviceId + "." + measurementId), dataFiles, unsealedTsFile,
- queryDataInMemtable(deviceId, measurementId, dataType, mSchema.getProps()));
+ return new SeriesDataSource(
+ new Path(deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId), dataFiles,
+ unsealedTsFile, queryDataInMemtable(deviceId, measurementId, dataType, mSchema.getProps()));
}
/**
@@ -712,14 +750,6 @@ public class TsFileProcessor extends Processor {
}
- public String getInsertFilePath() {
- return insertFile.getAbsolutePath();
- }
-
- public WriteLogNode getLogNode() {
- return logNode;
- }
-
/**
* used for test. We can know when the flush() is called.
*
@@ -739,7 +769,7 @@ public class TsFileProcessor extends Processor {
}
@Override
- public void close() throws BufferWriteProcessorException {
+ public void close() throws TsFileProcessorException {
closeCurrentFile();
try {
if (currentResource != null) {
@@ -750,14 +780,14 @@ public class TsFileProcessor extends Processor {
}
} catch (IOException e) {
- throw new BufferWriteProcessorException(e);
+ throw new TsFileProcessorException(e);
}
}
/**
* remove all data of this processor. Used For UT
*/
- public void removeMe() throws BufferWriteProcessorException, IOException {
+ void removeMe() throws TsFileProcessorException, IOException {
try {
flushFuture.get(10000, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
@@ -769,8 +799,9 @@ public class TsFileProcessor extends Processor {
close();
for (String folder : Directories.getInstance().getAllTsFileFolders()) {
File dataFolder = new File(folder, processorName);
- if (dataFolder.exists()) {
- for (File file: dataFolder.listFiles()) {
+ File[] files;
+ if (dataFolder.exists() && (files = dataFolder.listFiles()) != null) {
+ for (File file: files) {
Files.deleteIfExists(Paths.get(file.getAbsolutePath()));
}
}
@@ -817,4 +848,19 @@ public class TsFileProcessor extends Processor {
public int hashCode() {
return super.hashCode();
}
+
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ private void writeLog(InsertPlan plan)
+ throws TsFileProcessorException {
+ try {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+ logNode.write(plan);
+ }
+ } catch (IOException e) {
+ throw new TsFileProcessorException(e);
+ }
+ }
}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/TooManyChunksException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/TooManyChunksException.java
index c0062f1..1e21c89 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/TooManyChunksException.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/TooManyChunksException.java
@@ -1,3 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.iotdb.db.exception;
import java.io.File;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/OperationResult.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
similarity index 67%
copy from iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/OperationResult.java
copy to iotdb/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
index 27ce0ff..88ffba3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/OperationResult.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
@@ -17,15 +17,22 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.sgmanager;
+package org.apache.iotdb.db.exception;
-public enum OperationResult {
- //success
- WRITE_SUCCESS,
- //success but the memory is in warning status
- WRITE_IN_WARNING_MEM,
- //the write operation is reject because of the timestamp is not allowed
- WRITE_REJECT_BY_TIME,
- //the write operation is reject because there is no available memory
- WRITE_REJECT_BY_MEM;
+public class TsFileProcessorException extends Exception {
+
+ public TsFileProcessorException() {
+ }
+
+ public TsFileProcessorException(String message) {
+ super(message);
+ }
+
+ public TsFileProcessorException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TsFileProcessorException(Throwable cause) {
+ super(cause);
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index 42be491..cb55dfd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -28,7 +28,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.engine.filenode.FileNodeManager;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
@@ -212,7 +212,7 @@ public class QueryResourceManager {
overflowSeriesDataSource.setOverflowInsertFileList(Collections.EMPTY_LIST);
SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(selectedPath, null);
- GlobalSortedSeriesDataSource dataSource =processor.query(singleSeriesExpression, context);
+ SeriesDataSource dataSource =processor.query(singleSeriesExpression, context);
QueryDataSource queryDataSource = new QueryDataSource(dataSource, overflowSeriesDataSource);
// add used files to current thread request cached map
filePathsManager.addUsedFilesForGivenJob(context.getJobId(), queryDataSource);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
index b691d7f..668d81e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.reader.sequence;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.IAggregateReader;
import org.apache.iotdb.db.query.reader.IBatchReader;
@@ -53,7 +53,7 @@ public class SequenceDataReader implements IBatchReader, IAggregateReader {
* @param isReverse true-traverse chunks from behind forward, false-traverse chunks from front to
* back.
*/
- public SequenceDataReader(GlobalSortedSeriesDataSource sources, Filter filter,
+ public SequenceDataReader(SeriesDataSource sources, Filter filter,
QueryContext context, boolean isReverse) throws IOException {
seriesReaders = new ArrayList<>();
@@ -100,7 +100,7 @@ public class SequenceDataReader implements IBatchReader, IAggregateReader {
/**
* init with globalSortedSeriesDataSource, filter, context and isReverse.
*/
- public SequenceDataReader(GlobalSortedSeriesDataSource sources, Filter filter,
+ public SequenceDataReader(SeriesDataSource sources, Filter filter,
QueryContext context) throws IOException {
this(sources, filter, context, false);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java
index a5fdc44..0eff726 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.reader.sequence;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.mem.MemChunkReaderByTimestamp;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
@@ -36,7 +36,7 @@ public class SequenceDataReaderByTimestamp implements EngineReaderByTimeStamp {
/**
* init with globalSortedSeriesDataSource and filter.
*/
- public SequenceDataReaderByTimestamp(GlobalSortedSeriesDataSource sources, QueryContext context)
+ public SequenceDataReaderByTimestamp(SeriesDataSource sources, QueryContext context)
throws IOException {
seriesReaders = new ArrayList<>();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
index 8bf9fb9..ec8527a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
@@ -106,12 +106,11 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
@Override
- public WriteLogNode getNode(String identifier, String restoreFilePath,
- String processorStoreFilePath)
+ public WriteLogNode getNode(String identifier, String restoreFilePath)
throws IOException {
WriteLogNode node = nodeMap.get(identifier);
- if (node == null && restoreFilePath != null && processorStoreFilePath != null) {
- node = new ExclusiveWriteLogNode(identifier, restoreFilePath, processorStoreFilePath);
+ if (node == null && restoreFilePath != null) {
+ node = new ExclusiveWriteLogNode(identifier, restoreFilePath);
WriteLogNode oldNode = nodeMap.putIfAbsent(identifier, node);
if (oldNode != null) {
return oldNode;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
index 57b3e44..a2823bb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
@@ -35,9 +35,8 @@ public interface WriteLogNodeManager {
*
* @param identifier -identifier, the format: "{storageGroupName}-bufferwrite/overflow"
* @param restoreFilePath -restore file path of the data file. e.g, data/settled/{storageGroupName}/{tsfileName}.restore
- * @param processorStoreFilePath -processor store file path. e.g., data/system/info/{storageGroupName}/{storageGroupName}.restore
*/
- WriteLogNode getNode(String identifier, String restoreFilePath, String processorStoreFilePath)
+ WriteLogNode getNode(String identifier, String restoreFilePath)
throws IOException;
/**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 2c1fd72..434e901 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -72,15 +72,13 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
*
* @param identifier ExclusiveWriteLogNode identifier
* @param restoreFilePath restore file path
- * @param processorStoreFilePath processor store file path
*/
- public ExclusiveWriteLogNode(String identifier, String restoreFilePath,
- String processorStoreFilePath) {
+ public ExclusiveWriteLogNode(String identifier, String restoreFilePath) {
this.identifier = identifier;
this.logDirectory = config.getWalFolder() + File.separator + this.identifier;
new File(logDirectory).mkdirs();
- recoverPerformer = new ExclusiveLogRecoverPerformer(restoreFilePath, processorStoreFilePath,
+ recoverPerformer = new ExclusiveLogRecoverPerformer(restoreFilePath,
this);
currentFileWriter = new LogWriter(logDirectory + File.separator + WAL_FILE_NAME);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
index 15e88dd..893bb1a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
@@ -55,7 +55,6 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
private ExclusiveWriteLogNode writeLogNode;
private String recoveryFlagPath;
private String restoreFilePath;
- private String processorStoreFilePath;
private RecoverStage currStage;
private LogReplayer replayer = new ConcreteLogReplayer();
private RecoverPerformer fileNodeRecoverPerformer;
@@ -65,10 +64,8 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
/**
* constructor of ExclusiveLogRecoverPerformer.
*/
- public ExclusiveLogRecoverPerformer(String restoreFilePath, String processorStoreFilePath,
- ExclusiveWriteLogNode logNode) {
+ public ExclusiveLogRecoverPerformer(String restoreFilePath, ExclusiveWriteLogNode logNode) {
this.restoreFilePath = restoreFilePath;
- this.processorStoreFilePath = processorStoreFilePath;
this.writeLogNode = logNode;
this.fileNodeRecoverPerformer = new FileNodeRecoverPerformer(writeLogNode.getIdentifier());
this.isOverflow = logNode.getFileNodeName().contains(IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
@@ -207,19 +204,6 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
}
}
- String recoverProcessorStoreFilePath = processorStoreFilePath + RECOVER_SUFFIX;
- File recoverProcessorStoreFile = new File(recoverProcessorStoreFilePath);
- File processorStoreFile = new File(processorStoreFilePath);
- if (!recoverProcessorStoreFile.exists() && processorStoreFile.exists()) {
- try {
- FileUtils.copyFile(processorStoreFile, recoverProcessorStoreFile);
- } catch (Exception e) {
- logger.error("Log node {} cannot backup processor file",
- writeLogNode.getLogDirectory(), e);
- throw new RecoverException("Cannot backup processor file, recovery aborted.");
- }
- }
-
setFlag(BACK_UP);
currStage = RECOVER_FILE;
logger.info("Log node {} backup ended", writeLogNode.getLogDirectory());
@@ -239,19 +223,6 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
throw new RecoverException("Cannot recover restore file, recovery aborted.");
}
- String recoverProcessorStoreFilePath = processorStoreFilePath + RECOVER_SUFFIX;
- File recoverProcessorStoreFile = new File(recoverProcessorStoreFilePath);
- try {
- if (recoverProcessorStoreFile.exists()) {
- FileUtils.copyFile(recoverProcessorStoreFile, new File(processorStoreFilePath));
- }
- } catch (Exception e) {
- logger.error("Log node {} cannot recover processor file, because{}",
- writeLogNode.getLogDirectory(),
- e.getMessage());
- throw new RecoverException("Cannot recover processor file, recovery aborted.");
- }
-
fileNodeRecoverPerformer.recover();
currStage = REPLAY_LOG;
@@ -336,13 +307,6 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
.error("Log node {} cannot delete backup restore file", writeLogNode.getLogDirectory());
failedFiles.add(recoverRestoreFilePath);
}
- String recoverProcessorStoreFilePath = processorStoreFilePath + RECOVER_SUFFIX;
- File recoverProcessorStoreFile = new File(recoverProcessorStoreFilePath);
- if (recoverProcessorStoreFile.exists() && !recoverProcessorStoreFile.delete()) {
- logger.error("Log node {} cannot delete backup processor store file",
- writeLogNode.getLogDirectory());
- failedFiles.add(recoverProcessorStoreFilePath);
- }
// clean log file
File oldLogFile = new File(
writeLogNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME