You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/05/29 00:58:19 UTC
[incubator-iotdb] 02/05: abstract StorageGroupManager as
StorageEngine
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch refactor_overflow
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 1ea3bfff7b892c696e93d7cbb74e695f07efc96d
Author: 江天 <jt...@163.com>
AuthorDate: Tue May 28 12:43:38 2019 +0800
abstract StorageGroupManager as StorageEngine
---
.../db/engine/memcontrol/FlushPartialPolicy.java | 3 +-
.../db/engine/overflow/io/OverflowProcessor.java | 1 -
.../iotdb/db/engine/sgmanager/StorageEngine.java | 167 +++++++++++++++++++++
.../db/engine/sgmanager/StorageEngineFactory.java | 27 ++++
.../db/engine/sgmanager/StorageGroupManager.java | 136 ++++++-----------
5 files changed, 237 insertions(+), 97 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java
index 31a5860..a3604a3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java
@@ -20,13 +20,12 @@ package org.apache.iotdb.db.engine.memcontrol;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.utils.MemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This class only gives a hint to FilenodeManager that it may flush some data to avoid rush hour.
+ * This class only gives a hint to StorageGroupManager that it may flush some data to avoid rush hour.
*/
public class FlushPartialPolicy implements Policy {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index f052876..466138f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.Processor;
import org.apache.iotdb.db.engine.bufferwrite.Action;
import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngine.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngine.java
new file mode 100644
index 0000000..72c7dcf
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngine.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.sgmanager;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageGroupManagerException;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+
+/**
+ * StorageEngine is an abstraction of IoTDB storage-level interfaces.
+ */
+public interface StorageEngine {
+
+ /**
+ * This function is just for unit test.
+ */
+ void reset();
+
+ /**
+ * Execute an insertion.
+ *
+ * @param plan an insert plan
+ * @param isMonitor if true, the insertion is done by StatMonitor and the statistic Info will not
+ * be recorded. if false, the statParamsHashMap will be updated.
+ */
+ void insert(InsertPlan plan, boolean isMonitor) throws StorageGroupManagerException;
+
+ /**
+ * update data.
+ */
+ void update(String deviceId, String measurementId, long startTime, long endTime,
+ TSDataType type, String v)
+ throws StorageGroupManagerException;
+
+ /**
+ * Delete data whose timestamp <= 'timestamp' of time series 'deviceId'.'measurementId'.
+ * @param deviceId
+ * @param measurementId
+ * @param timestamp
+ * @throws StorageGroupManagerException
+ */
+ void deleteData(String deviceId, String measurementId, long timestamp)
+ throws StorageGroupManagerException;
+
+ /**
+ * Similar to deleteData(), but only deletes data in sequence files. Only used by WAL recovery.
+ */
+ void deleteInSeqFile(String deviceId, String measurementId, long timestamp)
+ throws StorageGroupManagerException;
+
+ /**
+ * Similar to delete(), but only deletes data in Overflow. Only used by WAL recovery.
+ */
+ void deleteInOverflow(String deviceId, String measurementId, long timestamp)
+ throws StorageGroupManagerException;
+
+ /**
+ * Get a StorageGroup-level token for this query so that the StorageGroupProcessor may know which
+ * queries are occupying resources.
+ *
+ * @param deviceId queried deviceId
+ * @return a query token for the device.
+ */
+ int beginQuery(String deviceId) throws StorageGroupManagerException;
+
+ /**
+ * Notify the storage group of 'deviceId' that query 'token' has ended and its resource can be
+ * released.
+ */
+ void endQuery(String deviceId, int token) throws StorageGroupManagerException;
+
+ /**
+ * Find sealed files, unsealed file and memtable data in SeqFiles and OverflowFiles that contains the
+ * given series.
+ * @param seriesExpression provides the path of the series.
+ * @param context provides shared modifications across a query.
+ * @return sealed files, unsealed file and memtable data in SeqFiles or OverflowFiles
+ * @throws StorageGroupManagerException
+ */
+ QueryDataSource query(SingleSeriesExpression seriesExpression, QueryContext context)
+ throws StorageGroupManagerException;
+
+ /**
+ * Append one specified tsfile to the storage group. <b>This method is only provided for
+ * transmission module</b>
+ *
+ * @param storageGroupName the seriesPath of storage group
+ * @param appendFile the appended tsfile information
+ */
+ boolean appendFileToStorageGroup(String storageGroupName, TsFileResource appendFile,
+ String appendFilePath) throws StorageGroupManagerException;
+
+ /**
+ * get all overlap tsfiles which are conflict with the appendFile.
+ *
+ * @param storageGroupName the seriesPath of storage group
+ * @param appendFile the appended tsfile information
+ */
+ List<String> getOverlapFilesFromStorageGroup(String storageGroupName, TsFileResource appendFile,
+ String uuid) throws StorageGroupManagerException;
+
+ /**
+ * merge all overflowed storage group.
+ *
+ * @throws StorageGroupManagerException StorageGroupManagerException
+ */
+ void mergeAll() throws StorageGroupManagerException;
+
+ /**
+ * delete one storage group.
+ */
+ void deleteOneStorageGroup(String processorName) throws StorageGroupManagerException;
+
+ /**
+ * add time series.
+ */
+ void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding,
+ CompressionType compressor,
+ Map<String, String> props) throws StorageGroupManagerException;
+
+ /**
+ * Force to close the storage group processor.
+ */
+ void closeStorageGroup(String processorName) throws StorageGroupManagerException;
+
+ /**
+ * delete all storage groups.
+ */
+ boolean deleteAll() throws StorageGroupManagerException;
+
+ /**
+ * delete all storage groups.
+ */
+ void closeAll() throws StorageGroupManagerException;
+
+ /**
+ * force flush to control memory usage.
+ */
+ void forceFlush(BasicMemController.UsageLevel level);
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngineFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngineFactory.java
new file mode 100644
index 0000000..85218a7
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngineFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.sgmanager;
+
+public class StorageEngineFactory {
+
+ public static StorageEngine getCurrent() {
+ return StorageGroupManager.getInstance();
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java
index d313c38..22d37b8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java
@@ -36,15 +36,11 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessor;
import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
-import org.apache.iotdb.db.engine.pool.FlushManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.exception.StorageGroupManagerException;
-import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.StorageGroupManagerException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.IStatistic;
@@ -71,7 +67,7 @@ import org.slf4j.LoggerFactory;
* StorageGroupManager provides top-level interfaces to access IoTDB storage engine. It decides
* which StorageGroup(s) to access in order to complete a query.
*/
-public class StorageGroupManager implements IStatistic, IService {
+public class StorageGroupManager implements IStatistic, IService, StorageEngine {
private static final Logger LOGGER = LoggerFactory.getLogger(StorageGroupManager.class);
private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig();
@@ -92,9 +88,10 @@ public class StorageGroupManager implements IStatistic, IService {
private HashMap<String, AtomicLong> statParamsHashMap;
- private StorageGroupManager() {
+ private StorageGroupManager() throws StorageGroupManagerException {
processorMap = new ConcurrentHashMap<>();
initStat();
+ recover();
}
private void initStat() {
@@ -112,14 +109,14 @@ public class StorageGroupManager implements IStatistic, IService {
}
public static StorageGroupManager getInstance() {
- return StorageGroupManagerHolder.INSTANCE;
+ return StorageGroupManagerHolder.instance;
}
- private void updateStatHashMapWhenFail(TSRecord tsRecord) {
+ private void updateStatHashMapWhenFail(InsertPlan plan) {
statParamsHashMap.get(StorageGroupManagerStatConstants.TOTAL_REQ_FAIL.name())
.incrementAndGet();
statParamsHashMap.get(StorageGroupManagerStatConstants.TOTAL_POINTS_FAIL.name())
- .addAndGet(tsRecord.dataPointList.size());
+ .addAndGet(plan.getValues().length);
}
/**
@@ -172,7 +169,8 @@ public class StorageGroupManager implements IStatistic, IService {
/**
* This function is just for unit test.
*/
- public synchronized void resetStorageGroupManager() {
+ @Override
+ public synchronized void reset() {
for (String key : statParamsHashMap.keySet()) {
statParamsHashMap.put(key, new AtomicLong());
}
@@ -232,7 +230,7 @@ public class StorageGroupManager implements IStatistic, IService {
/**
* recover the StorageGroupProcessors.
*/
- public void recover() throws StorageGroupManagerException {
+ private void recover() throws StorageGroupManagerException {
List<String> storageGroupNames;
try {
storageGroupNames = MManager.getInstance().getAllStorageGroups();
@@ -254,13 +252,7 @@ public class StorageGroupManager implements IStatistic, IService {
}
}
- /**
- * insert TsRecord into storage group.
- *
- * @param plan an insert plan
- * @param isMonitor if true, the insertion is done by StatMonitor and the statistic Info will not
- * be recorded. if false, the statParamsHashMap will be updated.
- */
+ @Override
public void insert(InsertPlan plan, boolean isMonitor) throws StorageGroupManagerException {
long timestamp = plan.getTime();
@@ -277,6 +269,7 @@ public class StorageGroupManager implements IStatistic, IService {
result.toString()));
}
} catch (TsFileProcessorException e) {
+ updateStatHashMapWhenFail(plan);
throw new StorageGroupManagerException(String.format("Fail to write in SG %s",
processor.getProcessorName()), e);
} finally {
@@ -313,23 +306,15 @@ public class StorageGroupManager implements IStatistic, IService {
}
- /**
- * update data.
- */
+ @Override
public void update(String deviceId, String measurementId, long startTime, long endTime,
TSDataType type, String v)
throws StorageGroupManagerException {
throw new UnsupportedOperationException("Method unimplemented");
}
- /**
- * Delete data whose timestamp <= 'timestamp' of time series 'deviceId'.'measurementId'.
- * @param deviceId
- * @param measurementId
- * @param timestamp
- * @throws StorageGroupManagerException
- */
- public void deleteProcessor(String deviceId, String measurementId, long timestamp)
+ @Override
+ public void deleteData(String deviceId, String measurementId, long timestamp)
throws StorageGroupManagerException {
StorageGroupProcessor processor = getProcessor(deviceId, true);
@@ -392,9 +377,7 @@ public class StorageGroupManager implements IStatistic, IService {
}
}
- /**
- * Similar to delete(), but only deletes data in sequence files. Only used by WAL recovery.
- */
+ @Override
public void deleteInSeqFile(String deviceId, String measurementId, long timestamp)
throws StorageGroupManagerException {
StorageGroupProcessor processor = getProcessor(deviceId, true);
@@ -405,9 +388,7 @@ public class StorageGroupManager implements IStatistic, IService {
}
}
- /**
- * Similar to delete(), but only deletes data in Overflow. Only used by WAL recovery.
- */
+ @Override
public void deleteInOverflow(String deviceId, String measurementId, long timestamp)
throws StorageGroupManagerException {
StorageGroupProcessor processor = getProcessor(deviceId, true);
@@ -418,13 +399,7 @@ public class StorageGroupManager implements IStatistic, IService {
}
}
- /**
- * Get a StorageGroup-level token for this query so that the StorageGroupProcessor may know which
- * queries are occupying resources.
- *
- * @param deviceId queried deviceId
- * @return a query token for the device.
- */
+ @Override
public int beginQuery(String deviceId) throws StorageGroupManagerException {
StorageGroupProcessor processor = getProcessor(deviceId, true);
try {
@@ -438,10 +413,7 @@ public class StorageGroupManager implements IStatistic, IService {
}
}
- /**
- * Notify the storage group of 'deviceId' that query 'token' has ended and its resource can be
- * released.
- */
+ @Override
public void endQuery(String deviceId, int token) throws StorageGroupManagerException {
StorageGroupProcessor processorrocessor = getProcessor(deviceId, true);
@@ -456,14 +428,7 @@ public class StorageGroupManager implements IStatistic, IService {
}
}
- /**
- * Find sealed files, unsealed file and memtable data in SeqFiles and OverflowFiles that contains the
- * given series.
- * @param seriesExpression provides the path of the series.
- * @param context provides shared modifications across a query.
- * @return sealed files, unsealed file and memtable data in SeqFiles or OverflowFiles
- * @throws StorageGroupManagerException
- */
+ @Override
public QueryDataSource query(SingleSeriesExpression seriesExpression, QueryContext context)
throws StorageGroupManagerException {
String deviceId = seriesExpression.getSeriesPath().getDevice();
@@ -486,13 +451,7 @@ public class StorageGroupManager implements IStatistic, IService {
}
}
- /**
- * Append one specified tsfile to the storage group. <b>This method is only provided for
- * transmission module</b>
- *
- * @param storageGroupName the seriesPath of storage group
- * @param appendFile the appended tsfile information
- */
+ @Override
public boolean appendFileToStorageGroup(String storageGroupName, TsFileResource appendFile,
String appendFilePath) throws StorageGroupManagerException {
StorageGroupProcessor processor = getProcessor(storageGroupName, true);
@@ -516,13 +475,9 @@ public class StorageGroupManager implements IStatistic, IService {
return true;
}
- /**
- * get all overlap tsfiles which are conflict with the appendFile.
- *
- * @param storageGroupName the seriesPath of storage group
- * @param appendFile the appended tsfile information
- */
- public List<String> getOverlapFilesFromStorageGroup(String storageGroupName, TsFileResource appendFile,
+ @Override
+ public List<String> getOverlapFilesFromStorageGroup(String storageGroupName,
+ TsFileResource appendFile,
String uuid) throws StorageGroupManagerException {
StorageGroupProcessor processor = getProcessor(storageGroupName, true);
List<String> overlapFiles;
@@ -536,11 +491,8 @@ public class StorageGroupManager implements IStatistic, IService {
return overlapFiles;
}
- /**
- * merge all overflowed storage group.
- *
- * @throws StorageGroupManagerException StorageGroupManagerException
- */
+
+ @Override
public void mergeAll() throws StorageGroupManagerException {
if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) {
LOGGER.warn("Unable to merge all storage groups when the status is {}",
@@ -649,9 +601,7 @@ public class StorageGroupManager implements IStatistic, IService {
}
}
- /**
- * delete one storage group.
- */
+ @Override
public void deleteOneStorageGroup(String processorName) throws StorageGroupManagerException {
if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) {
return;
@@ -750,9 +700,7 @@ public class StorageGroupManager implements IStatistic, IService {
return res;
}
- /**
- * add time series.
- */
+ @Override
public void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding,
CompressionType compressor,
Map<String, String> props) throws StorageGroupManagerException {
@@ -764,10 +712,7 @@ public class StorageGroupManager implements IStatistic, IService {
}
}
-
- /**
- * Force to close the storage group processor.
- */
+ @Override
public void closeStorageGroup(String processorName) throws StorageGroupManagerException {
if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) {
return;
@@ -824,9 +769,7 @@ public class StorageGroupManager implements IStatistic, IService {
}
}
- /**
- * delete all storage groups.
- */
+ @Override
public synchronized boolean deleteAll() throws StorageGroupManagerException {
LOGGER.info("Start deleting all storage group");
if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) {
@@ -849,9 +792,7 @@ public class StorageGroupManager implements IStatistic, IService {
}
}
- /**
- * Try to close All.
- */
+ @Override
public void closeAll() throws StorageGroupManagerException {
LOGGER.info("Start closing all storage group processor");
if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) {
@@ -869,9 +810,7 @@ public class StorageGroupManager implements IStatistic, IService {
}
}
- /**
- * force flush to control memory usage.
- */
+ @Override
public void forceFlush(BasicMemController.UsageLevel level) {
switch (level) {
// only select the most urgent (most active or biggest in size)
@@ -981,7 +920,16 @@ public class StorageGroupManager implements IStatistic, IService {
private StorageGroupManagerHolder() {
}
- private static final StorageGroupManager INSTANCE = new StorageGroupManager();
+ private static StorageGroupManager instance;
+
+ static {
+ try {
+ instance = new StorageGroupManager();
+ } catch (StorageGroupManagerException e) {
+ LOGGER.error("Failed to initialize StorageGroupManager due to a FATAL error", e);
+ instance = null;
+ }
+ }
}
}
\ No newline at end of file