You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/11/05 13:35:36 UTC

[incubator-iotdb] branch master updated: [IOTDB-218] Refactor insert, update and delete (#520)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c08157  [IOTDB-218] Refactor insert, update and delete (#520)
4c08157 is described below

commit 4c08157b950d50a3e64f5c9613fde490fa791393
Author: SilverNarcissus <15...@smail.nju.edu.cn>
AuthorDate: Tue Nov 5 07:35:27 2019 -0600

    [IOTDB-218] Refactor insert, update and delete (#520)
    
    * make insert, update and delete return void
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  5 ++-
 .../engine/storagegroup/StorageGroupProcessor.java | 39 ++++++++--------------
 .../qp/executor/AbstractQueryProcessExecutor.java  |  6 ++--
 .../db/qp/executor/IQueryProcessExecutor.java      | 13 +++-----
 .../iotdb/db/qp/executor/QueryProcessExecutor.java | 22 ++++++------
 .../iotdb/db/engine/storagegroup/TTLTest.java      |  6 ++--
 .../apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 29 ++++++++--------
 7 files changed, 50 insertions(+), 70 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index db41982..ab744be 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -204,9 +204,8 @@ public class StorageEngine implements IService {
    * insert an InsertPlan to a storage group.
    *
    * @param insertPlan physical plan of insertion
-   * @return true if and only if this insertion succeeds
    */
-  public boolean insert(InsertPlan insertPlan) throws ProcessorException {
+  public void insert(InsertPlan insertPlan) throws ProcessorException {
 
     StorageGroupProcessor storageGroupProcessor;
     try {
@@ -220,7 +219,7 @@ public class StorageEngine implements IService {
 
     // TODO monitor: update statistics
     try {
-      return storageGroupProcessor.insert(insertPlan);
+      storageGroupProcessor.insert(insertPlan);
     } catch (QueryProcessorException e) {
       throw new ProcessorException(e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index e85f2fc..f51b78b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -114,6 +114,7 @@ public class StorageGroupProcessor {
 
   private static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods";
   private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class);
+  private static final int MAX_CACHE_SENSORS = 5000;
   /**
    * a read write lock for guaranteeing concurrent safety when accessing all fields in this class
    * (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor,
@@ -156,34 +157,27 @@ public class StorageGroupProcessor {
   private Map<String, Long> latestFlushedTimeForEachDevice = new HashMap<>();
   private String storageGroupName;
   private File storageGroupSysDir;
-
   /**
    * versionController assigns a version for each MemTable and deletion/update such that after they
    * are persisted, the order of insertions, deletions and updates can be re-determined.
    */
   private VersionController versionController;
-
   /**
    * mergeLock is to be used in the merge process. Concurrent queries, deletions and merges may
    * result in losing some deletion in the merged new file, so a lock is necessary.
    */
   private ReentrantReadWriteLock mergeLock = new ReentrantReadWriteLock();
-
   /**
    * This is the modification file of the result of the current merge. Because the merged file may
    * be invisible at this moment, without this, deletion/update during merge could be lost.
    */
   private ModificationFile mergingModification;
-
   private volatile boolean isMerging = false;
   private long mergeStartTime;
-
   /**
    * This linked list records the access order of measurements used by query.
    */
   private LinkedList<String> lruForSensorUsedInQuery = new LinkedList<>();
-  private static final int MAX_CACHE_SENSORS = 5000;
-
   /**
    * when the data in a storage group is older than dataTTL, it is considered invalid and will
    * be eventually removed.
@@ -357,10 +351,10 @@ public class StorageGroupProcessor {
     }
   }
 
-  public boolean insert(InsertPlan insertPlan) throws QueryProcessorException {
+  public void insert(InsertPlan insertPlan) throws QueryProcessorException {
     // reject insertions that are out of ttl
     if (!checkTTL(insertPlan.getTime())) {
-     throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL));
+      throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL));
     }
     writeLock();
     try {
@@ -369,7 +363,7 @@ public class StorageGroupProcessor {
       latestFlushedTimeForEachDevice.putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE);
 
       // insert to sequence or unSequence file
-      return insertToTsFileProcessor(insertPlan,
+      insertToTsFileProcessor(insertPlan,
           insertPlan.getTime() > latestFlushedTimeForEachDevice.get(insertPlan.getDeviceId()));
     } finally {
       writeUnlock();
@@ -417,8 +411,6 @@ public class StorageGroupProcessor {
   }
 
   /**
-   *
-   * @param time
    * @return whether the given time falls in ttl
    */
   private boolean checkTTL(long time) {
@@ -458,7 +450,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  private boolean insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence)
+  private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence)
       throws QueryProcessorException {
     TsFileProcessor tsFileProcessor;
     boolean result;
@@ -466,7 +458,7 @@ public class StorageGroupProcessor {
     tsFileProcessor = getOrCreateTsFileProcessor(sequence);
 
     if (tsFileProcessor == null) {
-      return false;
+      return;
     }
 
     // insert TsFileProcessor
@@ -489,7 +481,6 @@ public class StorageGroupProcessor {
         tsFileProcessor.asyncFlush();
       }
     }
-    return result;
   }
 
   private TsFileProcessor getOrCreateTsFileProcessor(boolean sequence) {
@@ -830,9 +821,6 @@ public class StorageGroupProcessor {
   }
 
   /**
-   *
-   * @param tsFileResource
-   * @param deviceId
    * @return true if the device is contained in the TsFile and it lives beyond TTL
    */
   private boolean testResourceDevice(TsFileResource tsFileResource, String deviceId) {
@@ -993,7 +981,8 @@ public class StorageGroupProcessor {
 
       long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
       long timeLowerBound = System.currentTimeMillis() - dataTTL;
-      MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList, timeLowerBound);
+      MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList,
+          timeLowerBound);
 
       IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource);
       try {
@@ -1328,12 +1317,6 @@ public class StorageGroupProcessor {
     return workSequenceTsFileProcessor;
   }
 
-  @FunctionalInterface
-  public interface CloseTsFileCallBack {
-
-    void call(TsFileProcessor caller) throws TsFileProcessorException, IOException;
-  }
-
   public void setDataTTL(long dataTTL) {
     this.dataTTL = dataTTL;
     checkFilesTTL();
@@ -1353,4 +1336,10 @@ public class StorageGroupProcessor {
     LOAD_SEQUENCE, LOAD_UNSEQUENCE
   }
 
+  @FunctionalInterface
+  public interface CloseTsFileCallBack {
+
+    void call(TsFileProcessor caller) throws TsFileProcessorException, IOException;
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java
index 5e42492..66b41df 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java
@@ -198,9 +198,8 @@ public abstract class AbstractQueryProcessExecutor implements IQueryProcessExecu
   }
 
   @Override
-  public boolean delete(DeletePlan deletePlan) throws ProcessorException {
+  public void delete(DeletePlan deletePlan) throws ProcessorException {
     try {
-      boolean result = true;
       MManager mManager = MManager.getInstance();
       Set<String> existingPaths = new HashSet<>();
       for (Path p : deletePlan.getPaths()) {
@@ -217,9 +216,8 @@ public abstract class AbstractQueryProcessExecutor implements IQueryProcessExecu
         }
       }
       for (String path : existingPaths) {
-        result &= delete(new Path(path), deletePlan.getDeleteTime());
+        delete(new Path(path), deletePlan.getDeleteTime());
       }
-      return result;
     } catch (MetadataErrorException e) {
       throw new ProcessorException(e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
index f8f4de2..acc92cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@ -86,34 +86,31 @@ public interface IQueryProcessExecutor {
    * @param startTime start time in update command
    * @param endTime end time in update command
    * @param value - in type of string
-   * @return - whether the operator is successful.
    */
-  boolean update(Path path, long startTime, long endTime, String value)
+  void update(Path path, long startTime, long endTime, String value)
       throws ProcessorException;
 
   /**
    * execute delete command and return whether the operator is successful.
    *
    * @param deletePlan physical delete plan
-   * @return - whether the operator is successful.
    */
-  boolean delete(DeletePlan deletePlan) throws ProcessorException;
+  void delete(DeletePlan deletePlan) throws ProcessorException;
 
   /**
    * execute delete command and return whether the operator is successful.
    *
    * @param path : delete series seriesPath
    * @param deleteTime end time in delete command
-   * @return - whether the operator is successful.
    */
-  boolean delete(Path path, long deleteTime) throws ProcessorException;
+  void delete(Path path, long deleteTime) throws ProcessorException;
 
   /**
    * execute insert command and return whether the operator is successful.
    *
-   * @return - Operate Type.
+   * @param insertPlan physical insert plan
    */
-  boolean insert(InsertPlan insertPlan) throws ProcessorException;
+  void insert(InsertPlan insertPlan) throws ProcessorException;
 
   /**
    * execute batch insert plan
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index f90c3c4..d3d6b64 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -95,16 +95,17 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
   public boolean processNonQuery(PhysicalPlan plan) throws ProcessorException {
     switch (plan.getOperatorType()) {
       case DELETE:
-        return delete((DeletePlan) plan);
+        delete((DeletePlan) plan);
+        return true;
       case UPDATE:
         UpdatePlan update = (UpdatePlan) plan;
-        boolean flag = true;
         for (Pair<Long, Long> timePair : update.getIntervals()) {
-          flag &= update(update.getPath(), timePair.left, timePair.right, update.getValue());
+          update(update.getPath(), timePair.left, timePair.right, update.getValue());
         }
-        return flag;
+        return true;
       case INSERT:
-        return insert((InsertPlan) plan);
+        insert((InsertPlan) plan);
+        return true;
       case CREATE_ROLE:
       case DELETE_ROLE:
       case CREATE_USER:
@@ -194,13 +195,12 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
   }
 
   @Override
-  public boolean update(Path path, long startTime, long endTime, String value)
+  public void update(Path path, long startTime, long endTime, String value)
       throws ProcessorException {
-    return false;
   }
 
   @Override
-  public boolean delete(Path path, long timestamp) throws ProcessorException {
+  public void delete(Path path, long timestamp) throws ProcessorException {
     String deviceId = path.getDevice();
     String measurementId = path.getMeasurement();
     try {
@@ -210,7 +210,6 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
       }
       mManager.getStorageGroupNameByPath(path.getFullPath());
       storageEngine.delete(deviceId, measurementId, timestamp);
-      return true;
     } catch (StorageGroupException | StorageEngineException e) {
       throw new ProcessorException(e);
     }
@@ -218,7 +217,7 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
 
 
   @Override
-  public boolean insert(InsertPlan insertPlan) throws ProcessorException {
+  public void insert(InsertPlan insertPlan) throws ProcessorException {
     try {
       String[] measurementList = insertPlan.getMeasurements();
       String deviceId = insertPlan.getDeviceId();
@@ -254,8 +253,7 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
         dataTypes[i] = measurementNode.getSchema().getType();
       }
       insertPlan.setDataTypes(dataTypes);
-      return storageEngine.insert(insertPlan);
-
+      storageEngine.insert(insertPlan);
     } catch (PathErrorException | StorageEngineException | MetadataErrorException | CacheException e) {
       throw new ProcessorException(e);
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 4bcbcc0..b3072a3 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -128,7 +128,7 @@ public class TTLTest {
     insertPlan.setDataTypes(new TSDataType[]{TSDataType.INT64});
 
     // ok without ttl
-    assertTrue(storageGroupProcessor.insert(insertPlan));
+    storageGroupProcessor.insert(insertPlan);
 
     storageGroupProcessor.setDataTTL(1000);
     // with ttl
@@ -141,7 +141,7 @@ public class TTLTest {
     }
     assertTrue(caught);
     insertPlan.setTime(System.currentTimeMillis() - 900);
-    assertTrue(storageGroupProcessor.insert(insertPlan));
+    storageGroupProcessor.insert(insertPlan);
   }
 
   private void prepareData() throws QueryProcessorException {
@@ -156,7 +156,7 @@ public class TTLTest {
     // sequence data
     for (int i = 1000; i < 2000; i++) {
       insertPlan.setTime(initTime - 2000 + i);
-      assertTrue(storageGroupProcessor.insert(insertPlan));
+      storageGroupProcessor.insert(insertPlan);
       if ((i + 1) % 300 == 0) {
         storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
       }
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java b/server/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index a826e7e..4d719f2 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -87,16 +87,17 @@ public class MemIntQpExecutor extends AbstractQueryProcessExecutor {
   public boolean processNonQuery(PhysicalPlan plan) throws ProcessorException {
     switch (plan.getOperatorType()) {
       case DELETE:
-        return delete((DeletePlan) plan);
+        delete((DeletePlan) plan);
+        return true;
       case UPDATE:
         UpdatePlan update = (UpdatePlan) plan;
-        boolean flag = true;
         for (Pair<Long, Long> timePair : update.getIntervals()) {
-          flag &= update(update.getPath(), timePair.left, timePair.right, update.getValue());
+          update(update.getPath(), timePair.left, timePair.right, update.getValue());
         }
-        return flag;
+        return true;
       case INSERT:
-        return insert((InsertPlan) plan);
+        insert((InsertPlan) plan);
+        return true;
       default:
         throw new UnsupportedOperationException();
     }
@@ -138,10 +139,9 @@ public class MemIntQpExecutor extends AbstractQueryProcessExecutor {
   }
 
   @Override
-  public boolean update(Path path, long startTime, long endTime, String value) {
+  public void update(Path path, long startTime, long endTime, String value) {
     if (!demoMemDataBase.containsKey(path.toString())) {
       LOG.warn("no series:{}", path);
-      return false;
     }
     TestSeries series = demoMemDataBase.get(path.toString());
     for (Entry<Long, Integer> entry : series.data.entrySet()) {
@@ -151,13 +151,12 @@ public class MemIntQpExecutor extends AbstractQueryProcessExecutor {
       }
     }
     LOG.info("update, series:{}, time range:<{},{}>, value:{}", path, startTime, endTime, value);
-    return true;
   }
 
   @Override
-  public boolean delete(Path path, long deleteTime) {
+  public void delete(Path path, long deleteTime) {
     if (!demoMemDataBase.containsKey(path.toString())) {
-      return true;
+      return;
     }
     TestSeries series = demoMemDataBase.get(path.toString());
     TreeMap<Long, Integer> delResult = new TreeMap<>();
@@ -169,7 +168,6 @@ public class MemIntQpExecutor extends AbstractQueryProcessExecutor {
     }
     series.data = delResult;
     LOG.info("delete series:{}, timestamp:{}", path, deleteTime);
-    return true;
   }
 
   @Override
@@ -182,16 +180,17 @@ public class MemIntQpExecutor extends AbstractQueryProcessExecutor {
   }
 
   @Override
-  public boolean insert(InsertPlan insertPlan) {
+  public void insert(InsertPlan insertPlan) {
     for (int i = 0; i < insertPlan.getMeasurements().length; i++) {
-      String strPath = insertPlan.getDeviceId() + IoTDBConstant.PATH_SEPARATOR + insertPlan.getMeasurements()[i];
+      String strPath =
+          insertPlan.getDeviceId() + IoTDBConstant.PATH_SEPARATOR + insertPlan.getMeasurements()[i];
       if (!demoMemDataBase.containsKey(strPath)) {
         demoMemDataBase.put(strPath, new TestSeries());
       }
-      demoMemDataBase.get(strPath).data.put(insertPlan.getTime(), Integer.valueOf(insertPlan.getValues()[i]));
+      demoMemDataBase.get(strPath).data
+          .put(insertPlan.getTime(), Integer.valueOf(insertPlan.getValues()[i]));
       timeStampUnion.add(insertPlan.getTime());
     }
-    return true;
   }
 
   @Override