You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/05/29 00:58:19 UTC

[incubator-iotdb] 02/05: abstract StorageGroupManager as StorageEngine

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch refactor_overflow
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 1ea3bfff7b892c696e93d7cbb74e695f07efc96d
Author: 江天 <jt...@163.com>
AuthorDate: Tue May 28 12:43:38 2019 +0800

    abstract StorageGroupManager as StorageEngine
---
 .../db/engine/memcontrol/FlushPartialPolicy.java   |   3 +-
 .../db/engine/overflow/io/OverflowProcessor.java   |   1 -
 .../iotdb/db/engine/sgmanager/StorageEngine.java   | 167 +++++++++++++++++++++
 .../db/engine/sgmanager/StorageEngineFactory.java  |  27 ++++
 .../db/engine/sgmanager/StorageGroupManager.java   | 136 ++++++-----------
 5 files changed, 237 insertions(+), 97 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java
index 31a5860..a3604a3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java
@@ -20,13 +20,12 @@ package org.apache.iotdb.db.engine.memcontrol;
 
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class only gives a hint to FilenodeManager that it may flush some data to avoid rush hour.
+ * This class only gives a hint to StorageGroupManager that it may flush some data to avoid rush hour.
  */
 public class FlushPartialPolicy implements Policy {
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index f052876..466138f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.Processor;
 import org.apache.iotdb.db.engine.bufferwrite.Action;
 import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
 import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngine.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngine.java
new file mode 100644
index 0000000..72c7dcf
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngine.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.sgmanager;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageGroupManagerException;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+
+/**
+ * StorageEngine is an abstraction of IoTDB storage-level interfaces.
+ */
+public interface StorageEngine {
+
+  /**
+   * This function is just for unit test.
+   */
+  void reset();
+
+  /**
+   * Execute an insertion.
+   *
+   * @param plan an insert plan
+   * @param isMonitor if true, the insertion is done by StatMonitor and the statistic Info will not
+   * be recorded. if false, the statParamsHashMap will be updated.
+   */
+  void insert(InsertPlan plan, boolean isMonitor) throws StorageGroupManagerException;
+
+  /**
+   * update data.
+   */
+  void update(String deviceId, String measurementId, long startTime, long endTime,
+      TSDataType type, String v)
+      throws StorageGroupManagerException;
+
+  /**
+   * Delete data whose timestamp <= 'timestamp' of time series 'deviceId'.'measurementId'.
+   * @param deviceId
+   * @param measurementId
+   * @param timestamp
+   * @throws StorageGroupManagerException
+   */
+  void deleteData(String deviceId, String measurementId, long timestamp)
+          throws StorageGroupManagerException;
+
+  /**
+   * Similar to deleteData(), but only deletes data in sequence files. Only used by WAL recovery.
+   */
+  void deleteInSeqFile(String deviceId, String measurementId, long timestamp)
+              throws StorageGroupManagerException;
+
+  /**
+   * Similar to delete(), but only deletes data in Overflow. Only used by WAL recovery.
+   */
+  void deleteInOverflow(String deviceId, String measurementId, long timestamp)
+                  throws StorageGroupManagerException;
+
+  /**
+   * Get a StorageGroup-level token for this query so that the StorageGroupProcessor may know which
+   * queries are occupying resources.
+   *
+   * @param deviceId queried deviceId
+   * @return a query token for the device.
+   */
+  int beginQuery(String deviceId) throws StorageGroupManagerException;
+
+  /**
+   * Notify the storage group of 'deviceId' that query 'token' has ended and its resource can be
+   * released.
+   */
+  void endQuery(String deviceId, int token) throws StorageGroupManagerException;
+
+  /**
+   * Find sealed files, unsealed file and memtable data in SeqFiles and OverflowFiles that contains the
+   * given series.
+   * @param seriesExpression provides the path of the series.
+   * @param context provides shared modifications across a query.
+   * @return sealed files, unsealed file and memtable data in SeqFiles or OverflowFiles
+   * @throws StorageGroupManagerException
+   */
+  QueryDataSource query(SingleSeriesExpression seriesExpression, QueryContext context)
+      throws StorageGroupManagerException;
+
+  /**
+   * Append one specified tsfile to the storage group. <b>This method is only provided for
+   * transmission module</b>
+   *
+   * @param storageGroupName the seriesPath of storage group
+   * @param appendFile the appended tsfile information
+   */
+  boolean appendFileToStorageGroup(String storageGroupName, TsFileResource appendFile,
+      String appendFilePath) throws StorageGroupManagerException;
+
+  /**
+   * get all overlap tsfiles which are conflict with the appendFile.
+   *
+   * @param storageGroupName the seriesPath of storage group
+   * @param appendFile the appended tsfile information
+   */
+  List<String> getOverlapFilesFromStorageGroup(String storageGroupName, TsFileResource appendFile,
+      String uuid) throws StorageGroupManagerException;
+
+  /**
+   * merge all overflowed storage group.
+   *
+   * @throws StorageGroupManagerException StorageGroupManagerException
+   */
+  void mergeAll() throws StorageGroupManagerException;
+
+  /**
+   * delete one storage group.
+   */
+  void deleteOneStorageGroup(String processorName) throws StorageGroupManagerException;
+
+  /**
+   * add time series.
+   */
+  void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding,
+      CompressionType compressor,
+      Map<String, String> props) throws StorageGroupManagerException;
+
+  /**
+   * Force to close the storage group processor.
+   */
+  void closeStorageGroup(String processorName) throws StorageGroupManagerException;
+
+  /**
+   * delete all storage groups.
+   */
+  boolean deleteAll() throws StorageGroupManagerException;
+
+  /**
+   * delete all storage groups.
+   */
+  void closeAll() throws StorageGroupManagerException;
+
+  /**
+   * force flush to control memory usage.
+   */
+  void forceFlush(BasicMemController.UsageLevel level);
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngineFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngineFactory.java
new file mode 100644
index 0000000..85218a7
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngineFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.sgmanager;
+
+public class StorageEngineFactory {
+
+  public static StorageEngine getCurrent() {
+    return StorageGroupManager.getInstance();
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java
index d313c38..22d37b8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java
@@ -36,15 +36,11 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessor;
 import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
-import org.apache.iotdb.db.engine.pool.FlushManager;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.exception.StorageGroupManagerException;
-import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.StorageGroupManagerException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.IStatistic;
@@ -71,7 +67,7 @@ import org.slf4j.LoggerFactory;
  * StorageGroupManager provides top-level interfaces to access IoTDB storage engine. It decides
  * which StorageGroup(s) to access in order to complete a query.
  */
-public class StorageGroupManager implements IStatistic, IService {
+public class StorageGroupManager implements IStatistic, IService, StorageEngine {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(StorageGroupManager.class);
   private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig();
@@ -92,9 +88,10 @@ public class StorageGroupManager implements IStatistic, IService {
 
   private HashMap<String, AtomicLong> statParamsHashMap;
 
-  private StorageGroupManager() {
+  private StorageGroupManager() throws StorageGroupManagerException {
     processorMap = new ConcurrentHashMap<>();
     initStat();
+    recover();
   }
 
   private void initStat() {
@@ -112,14 +109,14 @@ public class StorageGroupManager implements IStatistic, IService {
   }
 
   public static StorageGroupManager getInstance() {
-    return StorageGroupManagerHolder.INSTANCE;
+    return StorageGroupManagerHolder.instance;
   }
 
-  private void updateStatHashMapWhenFail(TSRecord tsRecord) {
+  private void updateStatHashMapWhenFail(InsertPlan plan) {
     statParamsHashMap.get(StorageGroupManagerStatConstants.TOTAL_REQ_FAIL.name())
         .incrementAndGet();
     statParamsHashMap.get(StorageGroupManagerStatConstants.TOTAL_POINTS_FAIL.name())
-        .addAndGet(tsRecord.dataPointList.size());
+        .addAndGet(plan.getValues().length);
   }
 
   /**
@@ -172,7 +169,8 @@ public class StorageGroupManager implements IStatistic, IService {
   /**
    * This function is just for unit test.
    */
-  public synchronized void resetStorageGroupManager() {
+  @Override
+  public synchronized void reset() {
     for (String key : statParamsHashMap.keySet()) {
       statParamsHashMap.put(key, new AtomicLong());
     }
@@ -232,7 +230,7 @@ public class StorageGroupManager implements IStatistic, IService {
   /**
    * recover the StorageGroupProcessors.
    */
-  public void recover() throws StorageGroupManagerException {
+  private void recover() throws StorageGroupManagerException {
     List<String> storageGroupNames;
     try {
       storageGroupNames = MManager.getInstance().getAllStorageGroups();
@@ -254,13 +252,7 @@ public class StorageGroupManager implements IStatistic, IService {
     }
   }
 
-  /**
-   * insert TsRecord into storage group.
-   *
-   * @param plan an insert plan
-   * @param isMonitor if true, the insertion is done by StatMonitor and the statistic Info will not
-   * be recorded. if false, the statParamsHashMap will be updated.
-   */
+  @Override
   public void insert(InsertPlan plan, boolean isMonitor) throws StorageGroupManagerException {
     long timestamp = plan.getTime();
 
@@ -277,6 +269,7 @@ public class StorageGroupManager implements IStatistic, IService {
             result.toString()));
       }
     } catch (TsFileProcessorException e) {
+      updateStatHashMapWhenFail(plan);
       throw new StorageGroupManagerException(String.format("Fail to write in SG %s",
           processor.getProcessorName()), e);
     } finally {
@@ -313,23 +306,15 @@ public class StorageGroupManager implements IStatistic, IService {
   }
 
 
-  /**
-   * update data.
-   */
+  @Override
   public void update(String deviceId, String measurementId, long startTime, long endTime,
       TSDataType type, String v)
       throws StorageGroupManagerException {
     throw new UnsupportedOperationException("Method unimplemented");
   }
 
-  /**
-   * Delete data whose timestamp <= 'timestamp' of time series 'deviceId'.'measurementId'.
-   * @param deviceId
-   * @param measurementId
-   * @param timestamp
-   * @throws StorageGroupManagerException
-   */
-  public void deleteProcessor(String deviceId, String measurementId, long timestamp)
+  @Override
+  public void deleteData(String deviceId, String measurementId, long timestamp)
       throws StorageGroupManagerException {
 
     StorageGroupProcessor processor = getProcessor(deviceId, true);
@@ -392,9 +377,7 @@ public class StorageGroupManager implements IStatistic, IService {
     }
   }
 
-  /**
-   * Similar to delete(), but only deletes data in sequence files. Only used by WAL recovery.
-   */
+  @Override
   public void deleteInSeqFile(String deviceId, String measurementId, long timestamp)
       throws StorageGroupManagerException {
     StorageGroupProcessor processor = getProcessor(deviceId, true);
@@ -405,9 +388,7 @@ public class StorageGroupManager implements IStatistic, IService {
     }
   }
 
-  /**
-   * Similar to delete(), but only deletes data in Overflow. Only used by WAL recovery.
-   */
+  @Override
   public void deleteInOverflow(String deviceId, String measurementId, long timestamp)
       throws StorageGroupManagerException {
     StorageGroupProcessor processor = getProcessor(deviceId, true);
@@ -418,13 +399,7 @@ public class StorageGroupManager implements IStatistic, IService {
     }
   }
 
-  /**
-   * Get a StorageGroup-level token for this query so that the StorageGroupProcessor may know which
-   * queries are occupying resources.
-   *
-   * @param deviceId queried deviceId
-   * @return a query token for the device.
-   */
+  @Override
   public int beginQuery(String deviceId) throws StorageGroupManagerException {
     StorageGroupProcessor processor = getProcessor(deviceId, true);
     try {
@@ -438,10 +413,7 @@ public class StorageGroupManager implements IStatistic, IService {
     }
   }
 
-  /**
-   * Notify the storage group of 'deviceId' that query 'token' has ended and its resource can be
-   * released.
-   */
+  @Override
   public void endQuery(String deviceId, int token) throws StorageGroupManagerException {
 
     StorageGroupProcessor processorrocessor = getProcessor(deviceId, true);
@@ -456,14 +428,7 @@ public class StorageGroupManager implements IStatistic, IService {
     }
   }
 
-  /**
-   * Find sealed files, unsealed file and memtable data in SeqFiles and OverflowFiles that contains the
-   * given series.
-   * @param seriesExpression provides the path of the series.
-   * @param context provides shared modifications across a query.
-   * @return sealed files, unsealed file and memtable data in SeqFiles or OverflowFiles
-   * @throws StorageGroupManagerException
-   */
+  @Override
   public QueryDataSource query(SingleSeriesExpression seriesExpression, QueryContext context)
       throws StorageGroupManagerException {
     String deviceId = seriesExpression.getSeriesPath().getDevice();
@@ -486,13 +451,7 @@ public class StorageGroupManager implements IStatistic, IService {
     }
   }
 
-  /**
-   * Append one specified tsfile to the storage group. <b>This method is only provided for
-   * transmission module</b>
-   *
-   * @param storageGroupName the seriesPath of storage group
-   * @param appendFile the appended tsfile information
-   */
+  @Override
   public boolean appendFileToStorageGroup(String storageGroupName, TsFileResource appendFile,
       String appendFilePath) throws StorageGroupManagerException {
     StorageGroupProcessor processor = getProcessor(storageGroupName, true);
@@ -516,13 +475,9 @@ public class StorageGroupManager implements IStatistic, IService {
     return true;
   }
 
-  /**
-   * get all overlap tsfiles which are conflict with the appendFile.
-   *
-   * @param storageGroupName the seriesPath of storage group
-   * @param appendFile the appended tsfile information
-   */
-  public List<String> getOverlapFilesFromStorageGroup(String storageGroupName, TsFileResource appendFile,
+  @Override
+  public List<String> getOverlapFilesFromStorageGroup(String storageGroupName,
+      TsFileResource appendFile,
       String uuid) throws StorageGroupManagerException {
     StorageGroupProcessor processor = getProcessor(storageGroupName, true);
     List<String> overlapFiles;
@@ -536,11 +491,8 @@ public class StorageGroupManager implements IStatistic, IService {
     return overlapFiles;
   }
 
-  /**
-   * merge all overflowed storage group.
-   *
-   * @throws StorageGroupManagerException StorageGroupManagerException
-   */
+
+  @Override
   public void mergeAll() throws StorageGroupManagerException {
     if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) {
       LOGGER.warn("Unable to merge all storage groups when the status is {}",
@@ -649,9 +601,7 @@ public class StorageGroupManager implements IStatistic, IService {
     }
   }
 
-  /**
-   * delete one storage group.
-   */
+  @Override
   public void deleteOneStorageGroup(String processorName) throws StorageGroupManagerException {
     if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) {
       return;
@@ -750,9 +700,7 @@ public class StorageGroupManager implements IStatistic, IService {
     return res;
   }
 
-  /**
-   * add time series.
-   */
+  @Override
   public void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding,
       CompressionType compressor,
       Map<String, String> props) throws StorageGroupManagerException {
@@ -764,10 +712,7 @@ public class StorageGroupManager implements IStatistic, IService {
     }
   }
 
-
-  /**
-   * Force to close the storage group processor.
-   */
+  @Override
   public void closeStorageGroup(String processorName) throws StorageGroupManagerException {
     if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) {
       return;
@@ -824,9 +769,7 @@ public class StorageGroupManager implements IStatistic, IService {
     }
   }
 
-  /**
-   * delete all storage groups.
-   */
+  @Override
   public synchronized boolean deleteAll() throws StorageGroupManagerException {
     LOGGER.info("Start deleting all storage group");
     if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) {
@@ -849,9 +792,7 @@ public class StorageGroupManager implements IStatistic, IService {
     }
   }
 
-  /**
-   * Try to close All.
-   */
+  @Override
   public void closeAll() throws StorageGroupManagerException {
     LOGGER.info("Start closing all storage group processor");
     if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) {
@@ -869,9 +810,7 @@ public class StorageGroupManager implements IStatistic, IService {
     }
   }
 
-  /**
-   * force flush to control memory usage.
-   */
+  @Override
   public void forceFlush(BasicMemController.UsageLevel level) {
     switch (level) {
       // only select the most urgent (most active or biggest in size)
@@ -981,7 +920,16 @@ public class StorageGroupManager implements IStatistic, IService {
     private StorageGroupManagerHolder() {
     }
 
-    private static final StorageGroupManager INSTANCE = new StorageGroupManager();
+    private static StorageGroupManager instance;
+
+    static {
+      try {
+        instance = new StorageGroupManager();
+      } catch (StorageGroupManagerException e) {
+        LOGGER.error("Failed to initialize StorageGroupManager due to a FATAL error", e);
+        instance = null;
+      }
+    }
   }
 
 }
\ No newline at end of file