You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/03/17 07:46:58 UTC

[iotdb] branch rocksdb/dev created (now 1589801)

This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a change to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 1589801  fix bug where storage groups were displayed incorrectly by show devices

This branch includes the following new commits:

     new 5d015a1  use wildcard to query
     new abc659c  Merge branch 'rocksdb/dev' of v.src.corp.qihoo.net:xt_hadoop/iotdb into rocksdb/dev
     new 4468993  updated the matched interface
     new a63cc7d  traverser changed to concurrency
     new 68f9c0d  fix a bug where the number of nodes is incorrect
     new 4821372  fix bug of full path query
     new e6e7bc0  [rocksdb] complete metadata transfer from mmanage to mrockdb
     new 06fe60e1 [rocksdb] complete data transfer
     new f15a6d1  spotless apply
     new c2d9d58  [rocksdb] refine key exist check logic
     new aeb546b  update the method of getting the schema
     new bfb0124  support showTimeseries
     new a809ae3  update the way of getting measurementPath
     new 4fa0f53  change the parameter type in function
     new 9640cf1  fix timeout issue when obtaining lock
     new 769ce5a  incorrect response to set storage group
     new 535dd2c  update the exception information in the log
     new 0f326e9  implement show devices
     new 311ad36  fix incorrect storagegroup name in show timeseries
     new c1d4303  [RcoksDB] refine delete timeseries logic
     new 40d09a4  [rocksdb] fix the bug that upsert alias fail, refine delete timeseries logic
     new 61d9edd  spotless apply
     new a54d080  fix some bugs
     new a543966  fix npe when getBelongedToSG
     new 399452f  Fix a bug where the result is invalid
     new 9ae48a2  fix a bug in wildcard processing
     new 126a31a  remove test info
     new 115c5a5  modify data transfer task
     new 1995926  fix a bug where storage groups could not be created automatically
     new 01736f1  update exception info
     new c9e84b8  fix the bug of deleting storage groups
     new 7f181c0  use max value replace magic number
     new e1fb08f  [RocksDB] code refine
     new a76b5f1  [rocksdb] fix build break
     new 16ec277  fix npe when init sgMNode
     new a068a41  fix bug of creating aligned timeseries
     new 25421c3  fix npe when query aligned timeseries
     new 2b12d68  update
     new 95d746c  delete unused method
     new ccd5604  Fix the bug that path in PathNotExistException is incomplete when do insert
     new 4a39a9e  support show child
     new 3cbf605  Merge branch 'rocksdb/dev' of v.src.corp.qihoo.net:xt_hadoop/iotdb into rocksdb/dev
     new 1599164  print exception info
     new 83da91c  fix bug of the incorrect number by counting nodes
     new 1589801  fix bug where storage groups were displayed incorrectly by show devices

The 45 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 29/45: fix a bug where storage groups could not be created automatically

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1995926ca69570954bfadec0a4a0c8f29926738e
Author: lisijia <li...@360.cn>
AuthorDate: Fri Mar 11 10:39:12 2022 +0800

    fix a bug where storage groups could not be created automatically
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 25 +++++++++++-----------
 1 file changed, 12 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 6eeb457..19c972d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -363,7 +363,7 @@ public class MRocksDBManager implements IMetaManager {
     // sg check and create
     String[] nodes = path.getNodes();
     SchemaUtils.checkDataTypeWithEncoding(schema.getType(), schema.getEncodingType());
-    int sgIndex = ensureStorageGroup(path, path.getNodeLength() - 1);
+    int sgIndex = ensureStorageGroup(path);
 
     try {
       createTimeSeriesRecursively(
@@ -535,7 +535,7 @@ public class MRocksDBManager implements IMetaManager {
       MetaFormatUtils.checkNodeName(measurements.get(i));
     }
 
-    int sgIndex = ensureStorageGroup(prefixPath, prefixPath.getNodeLength() - 1);
+    int sgIndex = ensureStorageGroup(prefixPath);
 
     try {
       createEntityRecursively(
@@ -610,6 +610,13 @@ public class MRocksDBManager implements IMetaManager {
           }
         } else {
           if (start == nodes.length) {
+
+            // make sure sg node and entity node are different
+            // eg.,'root.a' is a storage group path, 'root.a.b' can not be a timeseries
+            if (checkResult.getResult(RocksDBMNodeType.STORAGE_GROUP)) {
+              throw new MetadataException("Storage Group Node and Entity Node could not be same!");
+            }
+
             if (!checkResult.getResult(RocksDBMNodeType.ENTITY)) {
               throw new PathAlreadyExistException("Node already exists but not entity");
             }
@@ -765,7 +772,7 @@ public class MRocksDBManager implements IMetaManager {
 
   // region Interfaces and Implementation for StorageGroup and TTL operation
   // including sg set and delete, and ttl set
-  private int ensureStorageGroup(PartialPath path, int entityIndex) throws MetadataException {
+  private int ensureStorageGroup(PartialPath path) throws MetadataException {
     int sgIndex = -1;
     String[] nodes = path.getNodes();
     try {
@@ -776,9 +783,6 @@ public class MRocksDBManager implements IMetaManager {
         }
         PartialPath sgPath =
             MetaUtils.getStorageGroupPathByLevel(path, config.getDefaultStorageGroupLevel());
-        if ((entityIndex - sgPath.getNodeLength()) < 1) {
-          throw new MetadataException("Storage Group Node and Entity Node could not be same!");
-        }
         setStorageGroup(sgPath);
         sgIndex = sgPath.getNodeLength() - 1;
       }
@@ -790,12 +794,6 @@ public class MRocksDBManager implements IMetaManager {
     } catch (RocksDBException e) {
       throw new MetadataException(e);
     }
-
-    // make sure sg node and entity node are different
-    if ((entityIndex - sgIndex) < 1) {
-      throw new MetadataException("Storage Group Node and Entity Node could not be same!");
-    }
-
     return sgIndex;
   }
 
@@ -1834,6 +1832,7 @@ public class MRocksDBManager implements IMetaManager {
   /** Get storage group node by path. the give path don't need to be storage group path. */
   @Override
   public IStorageGroupMNode getStorageGroupNodeByPath(PartialPath path) throws MetadataException {
+    ensureStorageGroup(path);
     IStorageGroupMNode node = null;
     try {
       String[] nodes = path.getNodes();
@@ -2508,7 +2507,7 @@ public class MRocksDBManager implements IMetaManager {
       node = getDeviceNode(devicePath);
       return node;
     } catch (PathNotExistException e) {
-      int sgIndex = ensureStorageGroup(devicePath, devicePath.getNodeLength() - 1);
+      int sgIndex = ensureStorageGroup(devicePath);
       if (!config.isAutoCreateSchemaEnabled()) {
         throw new PathNotExistException(devicePath.getFullPath());
       }

[iotdb] 03/45: updated the matched interface

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 446899388c984b1692a9cabe6584066bf674bf74
Author: lisijia <li...@360.cn>
AuthorDate: Wed Mar 2 11:17:06 2022 +0800

    updated the matched interface
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 34 +++++++++++++++-------
 1 file changed, 24 insertions(+), 10 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index e009dfc..fc7a066 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -98,6 +98,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -1047,7 +1048,8 @@ public class MRocksDBManager implements IMetaManager {
                             if (RocksDBUtils.suffixMatch(iterator.key(), suffixToMatch)) {
                               if (lastIteration) {
                                 System.out.println("matched key: " + new String(iterator.key()));
-                                consumer.accept(new String(iterator.key()));
+                                consumer.accept(
+                                    RocksDBUtils.getPathByInnerName(new String(iterator.key())));
                               } else {
                                 tempNodes.add(RocksDBUtils.toMetaNodes(iterator.key()));
                               }
@@ -1180,13 +1182,7 @@ public class MRocksDBManager implements IMetaManager {
 
   private int getCountByNodeType(Character[] nodetype, String[] nodes) throws IllegalPathException {
     AtomicInteger atomicInteger = new AtomicInteger(0);
-    Consumer<String> consumer =
-        new Consumer<String>() {
-          @Override
-          public void accept(String s) {
-            atomicInteger.incrementAndGet();
-          }
-        };
+    Consumer<String> consumer = s -> atomicInteger.incrementAndGet();
 
     replaceMultiWildcard(nodes, MAX_PATH_DEPTH, consumer, nodetype);
     return atomicInteger.get();
@@ -1433,7 +1429,22 @@ public class MRocksDBManager implements IMetaManager {
   @Override
   public List<PartialPath> getMatchedStorageGroups(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
-    return new ArrayList<>(getMatchedPathWithNodeType(isPrefixMatch, pathPattern, NODE_TYPE_SG));
+    List<PartialPath> allPath = new ArrayList<>();
+    getMatchedPathByNodeType(pathPattern.getNodes(), new Character[] {NODE_TYPE_SG}, allPath);
+    return allPath;
+  }
+
+  private void getMatchedPathByNodeType(
+      String[] nodes, Character[] nodetype, Collection<PartialPath> collection)
+      throws IllegalPathException {
+    List<String> allResult = Collections.synchronizedList(new ArrayList<>());
+    Consumer<String> consumer = allResult::add;
+
+    replaceMultiWildcard(nodes, MAX_PATH_DEPTH, consumer, nodetype);
+
+    for (String path : allResult) {
+      collection.add(new PartialPath(path));
+    }
   }
 
   /** Get all storage group paths */
@@ -1497,7 +1508,10 @@ public class MRocksDBManager implements IMetaManager {
   @Override
   public Set<PartialPath> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
-    return getMatchedPathWithNodeType(isPrefixMatch, pathPattern, NODE_TYPE_ENTITY);
+
+    Set<PartialPath> allPath = new HashSet<>();
+    getMatchedPathByNodeType(pathPattern.getNodes(), new Character[] {NODE_TYPE_SG}, allPath);
+    return allPath;
   }
 
   private Set<PartialPath> getMatchedPathWithNodeType(

[iotdb] 27/45: remove test info

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 126a31ae25db2bccc5078e242442c044daf6676b
Author: lisijia <li...@360.cn>
AuthorDate: Thu Mar 10 17:52:47 2022 +0800

    remove test info
---
 .../java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java  | 6 ------
 1 file changed, 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 695e0bb..6eeb457 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -1006,11 +1006,6 @@ public class MRocksDBManager implements IMetaManager {
       Character[] nodeTypeArray)
       throws IllegalPathException {
     List<String[]> allNodesArray = RocksDBUtils.replaceMultiWildcardToSingle(nodes, maxLevel);
-    try {
-      readWriteHandler.scanAllKeys("E:\\ideaProject\\rel-360\\all.txt");
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
     allNodesArray.parallelStream().forEach(x -> traverseByPatternPath(x, function, nodeTypeArray));
   }
 
@@ -1839,7 +1834,6 @@ public class MRocksDBManager implements IMetaManager {
   /** Get storage group node by path. the give path don't need to be storage group path. */
   @Override
   public IStorageGroupMNode getStorageGroupNodeByPath(PartialPath path) throws MetadataException {
-    ensureStorageGroup(path, path.getNodeLength() - 1);
     IStorageGroupMNode node = null;
     try {
       String[] nodes = path.getNodes();

[iotdb] 10/45: [rocksdb] refine key exist check logic

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c2d9d588ca085e6f944afdfc7d0506cf8590788e
Author: chengjianyun <ch...@360.cn>
AuthorDate: Mon Mar 7 11:47:28 2022 +0800

    [rocksdb] refine key exist check logic
---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  3 +
 .../iotdb/db/metadata/rocksdb/CheckKeyResult.java  |  9 +++
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 11 ++-
 .../metadata/rocksdb/RocksDBReadWriteHandler.java  | 81 +++++++++-------------
 .../rocksdb/RocksDBReadWriteHandlerTest.java       | 57 +++++++++++++++
 5 files changed, 106 insertions(+), 55 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5751a45..85383e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -849,6 +849,9 @@ public class IoTDBDescriptor {
       // CQ
       loadCQProps(properties);
 
+      // meta manager
+      loadMetadataConfig(properties);
+
     } catch (FileNotFoundException e) {
       logger.warn("Fail to find config file {}", url, e);
     } catch (IOException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/CheckKeyResult.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/CheckKeyResult.java
index 5f6ce05..25f9ab1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/CheckKeyResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/CheckKeyResult.java
@@ -25,6 +25,7 @@ public class CheckKeyResult {
 
   private boolean[] result = new boolean[MAX_NODE_TYPE_NUM];
   private boolean existAnyKey = false;
+  private byte[] value;
 
   public void setSingleCheckValue(char index, boolean value) {
     if (value) {
@@ -37,6 +38,14 @@ public class CheckKeyResult {
     return existAnyKey;
   }
 
+  public byte[] getValue() {
+    return value;
+  }
+
+  public void setValue(byte[] value) {
+    this.value = value;
+  }
+
   public boolean getResult(RocksDBMNodeType type) {
     return result[type.value];
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 9ca93ad..ff41b13 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -120,7 +120,6 @@ import java.util.stream.Collectors;
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
-import static org.apache.iotdb.db.metadata.rocksdb.RocksDBUtils.*;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
 /**
@@ -375,12 +374,11 @@ public class MRocksDBManager implements IMetaManager {
       return;
     }
     String levelPath = RocksDBUtils.getLevelPath(nodes, start - 1);
-    Holder<byte[]> holder = new Holder<>();
     Lock lock = locksPool.get(levelPath);
     if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
       lockedLocks.push(lock);
       try {
-        CheckKeyResult checkResult = readWriteHandler.keyExistByAllTypes(levelPath, holder);
+        CheckKeyResult checkResult = readWriteHandler.keyExistByAllTypes(levelPath);
         if (!checkResult.existAnyKey()) {
           createTimeSeriesRecursively(
               nodes, start - 1, end, schema, alias, tags, attributes, lockedLocks);
@@ -406,7 +404,7 @@ public class MRocksDBManager implements IMetaManager {
               // convert the parent node to entity if it is internal node
               readWriteHandler.convertToEntityNode(levelPath, DEFAULT_NODE_VALUE);
             } else if (checkResult.getResult(RocksDBMNodeType.ENTITY)) {
-              if ((holder.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
+              if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
                 throw new AlignedTimeseriesException(
                     "Timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
                     levelPath);
@@ -582,11 +580,10 @@ public class MRocksDBManager implements IMetaManager {
       return;
     }
     String levelPath = RocksDBUtils.getLevelPath(nodes, start - 1);
-    Holder<byte[]> holder = new Holder<>();
     Lock lock = locksPool.get(levelPath);
     if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
       try {
-        CheckKeyResult checkResult = readWriteHandler.keyExistByAllTypes(levelPath, holder);
+        CheckKeyResult checkResult = readWriteHandler.keyExistByAllTypes(levelPath);
         if (!checkResult.existAnyKey()) {
           createEntityRecursively(nodes, start - 1, end, aligned, lockedLocks);
           if (start == nodes.length) {
@@ -602,7 +599,7 @@ public class MRocksDBManager implements IMetaManager {
               throw new PathAlreadyExistException("Node already exists but not entity");
             }
 
-            if ((holder.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
+            if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
               throw new PathAlreadyExistException("Entity node exists but not aligned");
             }
           } else if (checkResult.getResult(RocksDBMNodeType.MEASUREMENT)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
index 9ab5f42..0e701ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import com.google.common.primitives.Bytes;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
@@ -88,7 +89,7 @@ public class RocksDBReadWriteHandler {
     RocksDB.loadLibrary();
   }
 
-  private RocksDBReadWriteHandler() throws RocksDBException {
+  public RocksDBReadWriteHandler() throws RocksDBException {
     Options options = new Options();
     options.setCreateIfMissing(true);
     options.setAllowMmapReads(true);
@@ -165,8 +166,9 @@ public class RocksDBReadWriteHandler {
     rocksDB.put(key, value);
   }
 
-  public void createNode(String key, RocksDBMNodeType type, byte[] value) throws RocksDBException {
-    byte[] nodeKey = RocksDBUtils.toRocksDBKey(key, type.value);
+  public void createNode(String levelKey, RocksDBMNodeType type, byte[] value)
+      throws RocksDBException {
+    byte[] nodeKey = RocksDBUtils.toRocksDBKey(levelKey, type.value);
     rocksDB.put(nodeKey, value);
   }
 
@@ -220,11 +222,6 @@ public class RocksDBReadWriteHandler {
   }
 
   public CheckKeyResult keyExistByAllTypes(String levelKey) throws RocksDBException {
-    return keyExistByAllTypes(levelKey, new Holder<>());
-  }
-
-  public CheckKeyResult keyExistByAllTypes(String levelKey, Holder<byte[]> holder)
-      throws RocksDBException {
     RocksDBMNodeType[] types =
         new RocksDBMNodeType[] {
           RocksDBMNodeType.ALISA,
@@ -233,50 +230,36 @@ public class RocksDBReadWriteHandler {
           RocksDBMNodeType.MEASUREMENT,
           RocksDBMNodeType.STORAGE_GROUP
         };
-    return keyExistByTypes(levelKey, holder, types);
+    return keyExistByTypes(levelKey, types);
   }
 
   public CheckKeyResult keyExistByTypes(String levelKey, RocksDBMNodeType... types)
       throws RocksDBException {
-    return keyExistByTypes(levelKey, new Holder<>(), types);
-  }
-
-  public CheckKeyResult keyExistByTypes(
-      String levelKey, Holder<byte[]> holder, RocksDBMNodeType... types) throws RocksDBException {
-    // TODO: compare the performance between two methods
     CheckKeyResult result = new CheckKeyResult();
-    for (RocksDBMNodeType type : types) {
-      byte[] key = RocksDBUtils.toRocksDBKey(levelKey, type.value);
-      if (keyExist(key, holder)) {
-        result.setSingleCheckValue(type.value, keyExist(key, holder));
-        break;
+    try {
+      Arrays.stream(types)
+          //          .parallel()
+          .forEach(
+              x -> {
+                byte[] key = Bytes.concat(new byte[] {(byte) x.value}, levelKey.getBytes());
+                try {
+                  Holder<byte[]> holder = new Holder<>();
+                  boolean keyExisted = keyExist(key, holder);
+                  if (keyExisted) {
+                    result.setSingleCheckValue(x.value, true);
+                    result.setValue(holder.getValue());
+                  }
+                } catch (RocksDBException e) {
+                  throw new RuntimeException(e);
+                }
+              });
+    } catch (Exception e) {
+      if (e.getCause() instanceof RocksDBException) {
+        throw (RocksDBException) e.getCause();
       }
+      throw e;
     }
     return result;
-
-    //    try {
-    //      Arrays.stream(types)
-    //          .parallel()
-    //          .forEach(
-    //              x -> {
-    //                byte[] key = Bytes.concat(new byte[] {x.value}, levelKey.getBytes());
-    //                try {
-    //                  boolean keyExisted = keyExist(key, holder);
-    //                  if (keyExisted) {
-    //                    holder.getValue();
-    //                    result.setSingleCheckValue(x.value, true);
-    //                  }
-    //                } catch (RocksDBException e) {
-    //                  throw new RuntimeException(e);
-    //                }
-    //              });
-    //    } catch (Exception e) {
-    //      if (e.getCause() instanceof RocksDBException) {
-    //        throw (RocksDBException) e.getCause();
-    //      }
-    //      throw e;
-    //    }
-    //    return result;
   }
 
   public boolean keyExist(byte[] key, Holder<byte[]> holder) throws RocksDBException {
@@ -284,10 +267,12 @@ public class RocksDBReadWriteHandler {
     if (!rocksDB.keyMayExist(key, holder)) {
       exist = false;
     } else {
-      byte[] value = rocksDB.get(key);
-      if (value != null) {
-        exist = true;
-        holder.setValue(value);
+      if (holder.getValue() != null) {
+        byte[] value = rocksDB.get(key);
+        if (value != null) {
+          exist = true;
+          holder.setValue(value);
+        }
       }
     }
     return exist;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandlerTest.java
new file mode 100644
index 0000000..5f605c7
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandlerTest.java
@@ -0,0 +1,57 @@
+package org.apache.iotdb.db.metadata.rocksdb;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.rocksdb.RocksDBException;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iotdb.db.metadata.rocksdb.RocksDBReadWriteHandler.ROCKSDB_PATH;
+
+public class RocksDBReadWriteHandlerTest {
+
+  private RocksDBReadWriteHandler readWriteHandler;
+
+  @Before
+  public void setUp() throws MetadataException, RocksDBException {
+    File file = new File(ROCKSDB_PATH);
+    if (!file.exists()) {
+      file.mkdirs();
+    }
+    readWriteHandler = new RocksDBReadWriteHandler();
+  }
+
+  @Test
+  public void testKeyExistByTypes() throws IllegalPathException, RocksDBException {
+    List<PartialPath> timeseries = new ArrayList<>();
+    timeseries.add(new PartialPath("root.sg.d1.m1"));
+    timeseries.add(new PartialPath("root.sg.d1.m2"));
+    timeseries.add(new PartialPath("root.sg.d2.m1"));
+    timeseries.add(new PartialPath("root.sg.d2.m2"));
+    timeseries.add(new PartialPath("root.sg1.d1.m1"));
+    timeseries.add(new PartialPath("root.sg1.d1.m2"));
+    timeseries.add(new PartialPath("root.sg1.d2.m1"));
+    timeseries.add(new PartialPath("root.sg1.d2.m2"));
+
+    for (PartialPath path : timeseries) {
+      String levelPath = RocksDBUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+      readWriteHandler.createNode(
+          levelPath, RocksDBMNodeType.MEASUREMENT, path.getFullPath().getBytes());
+    }
+
+    for (PartialPath path : timeseries) {
+      String levelPath = RocksDBUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+      CheckKeyResult result = readWriteHandler.keyExistByAllTypes(levelPath);
+      Assert.assertTrue(result.existAnyKey());
+      Assert.assertNotNull(result.getValue());
+      Assert.assertEquals(path.getFullPath(), new String(result.getValue()));
+    }
+  }
+}

[iotdb] 18/45: implement show devices

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0f326e902fe12952f9972e1990a172baf48d8073
Author: lisijia <li...@360.cn>
AuthorDate: Wed Mar 9 17:04:58 2022 +0800

    implement show devices
---
 .../apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java   | 13 +++++++++++--
 .../org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java  |  9 +++++++++
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 8903cca..3161903 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -1503,7 +1503,7 @@ public class MRocksDBManager implements IMetaManager {
       throws MetadataException {
 
     Set<PartialPath> allPath = new HashSet<>();
-    getMatchedPathByNodeType(pathPattern.getNodes(), new Character[] {NODE_TYPE_SG}, allPath);
+    getMatchedPathByNodeType(pathPattern.getNodes(), new Character[] {NODE_TYPE_ENTITY}, allPath);
     return allPath;
   }
 
@@ -1530,7 +1530,16 @@ public class MRocksDBManager implements IMetaManager {
    */
   @Override
   public List<ShowDevicesResult> getMatchedDevices(ShowDevicesPlan plan) throws MetadataException {
-    return null;
+    List<ShowDevicesResult> res = Collections.synchronizedList(new ArrayList<>());
+    BiFunction<byte[], byte[], Boolean> function =
+        (a, b) -> {
+          String fullPath = RocksDBUtils.getPathByInnerName(new String(a));
+          res.add(new ShowDevicesResult(fullPath, RocksDBUtils.isAligned(b)));
+          return true;
+        };
+    traverseOutcomeBasins(
+        plan.getPath().getNodes(), MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_ENTITY});
+    return res;
   }
   // endregion
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index 2bf01f5..ee0c29b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -58,6 +58,7 @@ import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ESCAPE_PATH_S
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_ALIAS;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_ATTRIBUTES;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_TAGS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ALIAS;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
@@ -338,6 +339,14 @@ public class RocksDBUtils {
     return obj;
   }
 
+  public static boolean isAligned(byte[] value) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(value);
+    // skip the version flag and node type flag
+    ReadWriteIOUtils.readByte(byteBuffer);
+    // get block type
+    byte flag = ReadWriteIOUtils.readByte(byteBuffer);
+    return (flag & FLAG_IS_ALIGNED) > 0;
+  }
   /**
    * get inner name by converting partial path.
    *

[iotdb] 31/45: fix the bug of deleting storage groups

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c9e84b8f9918dae8a3192549731333ba9d3e22cd
Author: lisijia <li...@360.cn>
AuthorDate: Fri Mar 11 16:19:00 2022 +0800

    fix the bug of deleting storage groups
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 28 ++++++++++++----------
 1 file changed, 15 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 8139977..c0dd427 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -882,24 +882,26 @@ public class MRocksDBManager implements IMetaManager {
                 // wait for all executing createTimeseries operations are complete
                 Thread.sleep(MAX_LOCK_WAIT_TIME * MAX_PATH_DEPTH);
                 String[] nodes = path.getNodes();
-                Arrays.asList(ALL_NODE_TYPE_ARRAY).stream()
+                Arrays.stream(ALL_NODE_TYPE_ARRAY)
                     .parallel()
                     .forEach(
                         type -> {
                           try {
-                            String startPath =
-                                RocksDBUtils.getLevelPathPrefix(
-                                    nodes, nodes.length - 1, nodes.length);
-                            byte[] startKey = RocksDBUtils.toRocksDBKey(startPath, type);
-                            String endPath =
-                                RocksDBUtils.getLevelPathPrefix(
-                                    nodes, nodes.length - 1, MAX_PATH_DEPTH);
-                            byte[] endKey = RocksDBUtils.toRocksDBKey(endPath, type);
-                            if (type == NODE_TYPE_MEASUREMENT) {
-                              readWriteHandler.deleteNodeByPrefix(
-                                  readWriteHandler.getCFHByName(TABLE_NAME_TAGS), startKey, endKey);
+                            for (int i = nodes.length; i <= MAX_PATH_DEPTH; i++) {
+                              String startPath =
+                                  RocksDBUtils.getLevelPathPrefix(nodes, nodes.length - 1, i);
+                              byte[] startKey = RocksDBUtils.toRocksDBKey(startPath, type);
+                              byte[] endKey = new byte[startKey.length];
+                              System.arraycopy(startKey, 0, endKey, 0, startKey.length - 1);
+                              endKey[endKey.length - 1] = 127;
+                              if (type == NODE_TYPE_MEASUREMENT) {
+                                readWriteHandler.deleteNodeByPrefix(
+                                    readWriteHandler.getCFHByName(TABLE_NAME_TAGS),
+                                    startKey,
+                                    endKey);
+                              }
+                              readWriteHandler.deleteNodeByPrefix(startKey, endKey);
                             }
-                            readWriteHandler.deleteNodeByPrefix(startKey, endKey);
                           } catch (RocksDBException e) {
                             logger.error("delete storage error {}", path.getFullPath(), e);
                           }

[iotdb] 28/45: modify data transfer task

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 115c5a57261a8af61f823cb4fbfa19496d5b13e5
Author: chengjianyun <ch...@360.cn>
AuthorDate: Fri Mar 11 09:24:33 2022 +0800

    modify data transfer task
---
 .../db/metadata/rocksdb/MetaDataTransfer.java      | 49 ++++++++++++++++++----
 1 file changed, 40 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
index 89e59d6..7931e84 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
@@ -45,10 +45,14 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
 
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -76,6 +80,9 @@ public class MetaDataTransfer {
           + MetadataConstant.METADATA_LOG
           + ".transfer_failed";
 
+  private String idxFilePath =
+      RocksDBReadWriteHandler.ROCKSDB_PATH + File.separator + "transfer_mlog.idx";
+
   private AtomicInteger failedPlanCount = new AtomicInteger(0);
   private List<PhysicalPlan> retryPlans = new ArrayList<>();
 
@@ -125,8 +132,18 @@ public class MetaDataTransfer {
     File logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
     // init the metadata from the operation log
     if (logFile.exists()) {
-      try (MLogReader mLogReader = new MLogReader(schemaDir, MetadataConstant.METADATA_LOG); ) {
-        transferFromMLog(mLogReader);
+      try (MLogReader mLogReader = new MLogReader(schemaDir, MetadataConstant.METADATA_LOG)) {
+        int startIdx = 0;
+        File idxFile = new File(idxFilePath);
+        if (idxFile.exists()) {
+          try (BufferedReader br = new BufferedReader(new FileReader(idxFile))) {
+            String idxStr = br.readLine();
+            if (StringUtils.isNotEmpty(idxStr)) {
+              startIdx = Integer.valueOf(idxStr);
+            }
+          }
+        }
+        transferFromMLog(mLogReader, startIdx);
       } catch (Exception e) {
         throw new IOException("Failed to parser mlog.bin for err:" + e);
       }
@@ -138,18 +155,22 @@ public class MetaDataTransfer {
     logger.info("Transfer metadata from MManager to MRocksDBManager complete!");
   }
 
-  private void transferFromMLog(MLogReader mLogReader)
+  private void transferFromMLog(MLogReader mLogReader, long startIdx)
       throws IOException, MetadataException, ExecutionException, InterruptedException {
     long time = System.currentTimeMillis();
-    int idx = 0;
+    logger.info("start from {} to transfer data from mlog.bin", startIdx);
+    int currentIdx = 0;
     PhysicalPlan plan;
     List<PhysicalPlan> nonCollisionCollections = new ArrayList<>();
     while (mLogReader.hasNext()) {
       try {
         plan = mLogReader.next();
-        idx++;
+        currentIdx++;
+        if (currentIdx <= startIdx) {
+          continue;
+        }
       } catch (Exception e) {
-        logger.error("Parse mlog error at lineNumber {} because:", idx, e);
+        logger.error("Parse mlog error at lineNumber {} because:", currentIdx, e);
         throw e;
       }
       if (plan == null) {
@@ -161,15 +182,14 @@ public class MetaDataTransfer {
         case CREATE_ALIGNED_TIMESERIES:
         case AUTO_CREATE_DEVICE_MNODE:
           nonCollisionCollections.add(plan);
-          if (nonCollisionCollections.size() > 100000) {
+          if (nonCollisionCollections.size() > DEFAULT_TRANSFER_PLANS_BUFFER_SIZE) {
             executeBufferedOperation(nonCollisionCollections);
           }
           break;
         case SET_STORAGE_GROUP:
-        case DELETE_TIMESERIES:
-        case DELETE_STORAGE_GROUP:
         case TTL:
         case CHANGE_ALIAS:
+        case DELETE_TIMESERIES:
           executeBufferedOperation(nonCollisionCollections);
           try {
             rocksDBManager.operation(plan);
@@ -179,6 +199,12 @@ public class MetaDataTransfer {
             logger.error("Can not operate cmd {} for err:", plan.getOperatorType(), e);
           }
           break;
+        case DELETE_STORAGE_GROUP:
+          DeleteStorageGroupPlan deleteStorageGroupPlan = (DeleteStorageGroupPlan) plan;
+          for (PartialPath path : deleteStorageGroupPlan.getPaths()) {
+            logger.info("delete storage group: {}", path.getFullPath());
+          }
+          break;
         case CHANGE_TAG_OFFSET:
         case CREATE_TEMPLATE:
         case DROP_TEMPLATE:
@@ -211,6 +237,11 @@ public class MetaDataTransfer {
         }
       }
     }
+
+    File idxFile = new File(idxFilePath);
+    try (FileWriter writer = new FileWriter(idxFile)) {
+      writer.write(String.valueOf(currentIdx));
+    }
     logger.info(
         "Transfer data from mlog.bin complete after {}ms with {} errors",
         System.currentTimeMillis() - time,

[iotdb] 23/45: fix some bugs

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a54d080049b66372f6a926152c0d39373e216b4c
Author: chengjianyun <ch...@360.cn>
AuthorDate: Thu Mar 10 11:52:04 2022 +0800

    fix some bugs
---
 .../apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java | 15 ++++++++++++---
 .../iotdb/db/metadata/rocksdb/MetaDataTransfer.java       |  3 +++
 .../db/metadata/rocksdb/RocksDBReadWriteHandler.java      |  4 +++-
 3 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 671fcbe..e2375c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -833,9 +833,18 @@ public class MRocksDBManager implements IMetaManager {
                     levelKey, RocksDBMNodeType.STORAGE_GROUP, DEFAULT_NODE_VALUE);
               }
             } else {
-              boolean hasChild = !keyCheckResult.getResult(RocksDBMNodeType.STORAGE_GROUP);
-              throw new StorageGroupAlreadySetException(
-                  RocksDBUtils.concatNodesName(nodes, 0, i), hasChild);
+              if (i >= len - 1) {
+                if (keyCheckResult.getExistType() == RocksDBMNodeType.STORAGE_GROUP) {
+                  throw new StorageGroupAlreadySetException(storageGroup.getFullPath());
+                } else {
+                  throw new PathAlreadyExistException(storageGroup.getFullPath());
+                }
+              } else {
+                if (keyCheckResult.getExistType() != RocksDBMNodeType.INTERNAL) {
+                  throw new StorageGroupAlreadySetException(
+                      RocksDBUtils.concatNodesName(nodes, 0, i), true);
+                }
+              }
             }
           } finally {
             lock.unlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
index c3c8906..89e59d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
@@ -320,6 +320,9 @@ public class MetaDataTransfer {
         .forEach(
             sgNode -> {
               try {
+                if (sgNode.getPartialPath().getFullPath() == "root.iotcloud") {
+                  return;
+                }
                 rocksDBManager.setStorageGroup(sgNode.getPartialPath());
                 if (sgNode.getDataTTL() > 0) {
                   rocksDBManager.setTTL(sgNode.getPartialPath(), sgNode.getDataTTL());
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
index 69d8e7e..cac86e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
@@ -268,12 +268,14 @@ public class RocksDBReadWriteHandler {
     if (!rocksDB.keyMayExist(key, holder)) {
       exist = false;
     } else {
-      if (holder.getValue() != null) {
+      if (holder.getValue() == null) {
         byte[] value = rocksDB.get(key);
         if (value != null) {
           exist = true;
           holder.setValue(value);
         }
+      } else {
+        exist = true;
       }
     }
     return exist;

[iotdb] 35/45: fix npe when init sgMNode

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 16ec2775c824761a2d3d7df29e352282e4e6a1c3
Author: lisijia <li...@360.cn>
AuthorDate: Mon Mar 14 11:20:57 2022 +0800

    fix npe when init sgMNode
---
 .../org/apache/iotdb/db/metadata/rocksdb/mnode/RStorageGroupMNode.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RStorageGroupMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RStorageGroupMNode.java
index 198ca29..e303cdb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RStorageGroupMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RStorageGroupMNode.java
@@ -43,7 +43,7 @@ public class RStorageGroupMNode extends RInternalMNode implements IStorageGroupM
   public RStorageGroupMNode(String fullPath, byte[] value) {
     super(fullPath);
     Object ttl = RocksDBUtils.parseNodeValue(value, RMNodeValueType.TTL);
-    if (ttl != null) {
+    if (ttl == null) {
       ttl = IoTDBDescriptor.getInstance().getConfig().getDefaultTTL();
     }
     this.dataTTL = (long) ttl;

[iotdb] 32/45: use max value replace magic number

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7f181c070f1e81c862468933f74d9e9b3a9c3816
Author: lisijia <li...@360.cn>
AuthorDate: Fri Mar 11 16:23:02 2022 +0800

    use max value replace magic number
---
 .../main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index c0dd427..80c9084 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -893,7 +893,7 @@ public class MRocksDBManager implements IMetaManager {
                               byte[] startKey = RocksDBUtils.toRocksDBKey(startPath, type);
                               byte[] endKey = new byte[startKey.length];
                               System.arraycopy(startKey, 0, endKey, 0, startKey.length - 1);
-                              endKey[endKey.length - 1] = 127;
+                              endKey[endKey.length - 1] = Byte.MAX_VALUE;
                               if (type == NODE_TYPE_MEASUREMENT) {
                                 readWriteHandler.deleteNodeByPrefix(
                                     readWriteHandler.getCFHByName(TABLE_NAME_TAGS),

[iotdb] 38/45: update

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2b12d6884682a733b0a391f13129382da9030f1a
Author: lisijia <li...@360.cn>
AuthorDate: Mon Mar 14 18:40:55 2022 +0800

    update
---
 .../main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index 616a0fa..680696c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -367,7 +367,7 @@ public class RocksDBUtils {
     StringBuilder stringBuilder = new StringBuilder(nodeType + ROOT);
     for (int i = partialPath.indexOf(PATH_SEPARATOR); i < partialPath.length(); i++) {
       char currentChar = partialPath.charAt(i);
-      stringBuilder.append(partialPath.charAt(i));
+      stringBuilder.append(currentChar);
       if (currentChar == SPLIT_FLAG) {
         stringBuilder.append(level);
       }

[iotdb] 06/45: fix bug of full path query

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 482137224a29b477e7a8a8ccf83e93145375f9e3
Author: lisijia <li...@360.cn>
AuthorDate: Fri Mar 4 12:14:35 2022 +0800

    fix bug of full path query
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 23 +++++++++++++++++-----
 .../iotdb/db/metadata/rocksdb/RocksDBUtils.java    |  7 +++++--
 .../db/metadata/rocksdb/MRocksDBUnitTest.java      | 23 +++++++++++-----------
 3 files changed, 35 insertions(+), 18 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 0896d34..ec2bd73 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -973,11 +973,24 @@ public class MRocksDBManager implements IMetaManager {
 
     int indexOfPrefix = indexOfFirstWildcard(nodes, startIndex);
     if (indexOfPrefix >= nodes.length) {
-      StringBuilder stringBuilder = new StringBuilder();
-      for (int i = 0; i < nodes.length; i++) {
-        stringBuilder.append(RockDBConstants.PATH_SEPARATOR).append(nodes[i]);
-      }
-      consumer.accept(stringBuilder.substring(1));
+      Arrays.stream(nodeTypeArray)
+          .parallel()
+          .forEach(
+              x -> {
+                String levelPrefix =
+                    RocksDBUtils.convertPartialPathToInnerByNodes(nodes, nodes.length - 1, x);
+                try {
+                  if (readWriteHandler.keyExist(levelPrefix.getBytes())) {
+                    StringBuilder stringBuilder = new StringBuilder();
+                    for (String node : nodes) {
+                      stringBuilder.append(RockDBConstants.PATH_SEPARATOR).append(node);
+                    }
+                    consumer.accept(stringBuilder.substring(1));
+                  }
+                } catch (RocksDBException e) {
+                  logger.error(e.getMessage());
+                }
+              });
       return;
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index c4f4e17..2bf01f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -365,8 +365,11 @@ public class RocksDBUtils {
   public static String convertPartialPathToInnerByNodes(String[] nodes, int level, char nodeType) {
     StringBuilder stringBuilder = new StringBuilder();
     stringBuilder.append(nodeType).append(ROOT);
-    for (String str : nodes) {
-      stringBuilder.append(SPLIT_FLAG).append(level).append(str);
+    for (int i = 0; i < nodes.length; i++) {
+      if (i == 0 && nodes[i].equals(ROOT_STRING)) {
+        continue;
+      }
+      stringBuilder.append(SPLIT_FLAG).append(level).append(nodes[i]);
     }
     return stringBuilder.toString();
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java
index a0ce63f..1d37cf8 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java
@@ -7,15 +7,14 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 import static org.apache.iotdb.db.metadata.rocksdb.RocksDBReadWriteHandler.ROCKSDB_PATH;
 
@@ -177,20 +176,22 @@ public class MRocksDBUnitTest {
     Assert.assertEquals(
         1,
         mRocksDBManager.getStorageGroupNum(new PartialPath("root.inner1.inner2.inner3.sg"), false));
-    Assert.assertEquals(2, mRocksDBManager.getStorageGroupNum(new PartialPath("root.inner"), true));
-    Assert.assertEquals(6, mRocksDBManager.getStorageGroupNum(new PartialPath("root"), true));
+    Assert.assertEquals(
+        2, mRocksDBManager.getStorageGroupNum(new PartialPath("root.inner.**"), false));
+    Assert.assertEquals(6, mRocksDBManager.getStorageGroupNum(new PartialPath("root.**"), false));
 
     // test all timeseries number
     Assert.assertEquals(
         1, mRocksDBManager.getAllTimeseriesCount(new PartialPath("root.tt.sg.dd.m1")));
-    Assert.assertEquals(2, mRocksDBManager.getAllTimeseriesCount(new PartialPath("root"), true));
+    Assert.assertEquals(
+        2, mRocksDBManager.getAllTimeseriesCount(new PartialPath("root.**"), false));
 
     // test device number
     Assert.assertEquals(0, mRocksDBManager.getDevicesNum(new PartialPath("root.inner1.inner2")));
     Assert.assertEquals(
-        0, mRocksDBManager.getDevicesNum(new PartialPath("root.inner1.inner2"), true));
-    Assert.assertEquals(2, mRocksDBManager.getDevicesNum(new PartialPath("root.tt.sg"), true));
-    Assert.assertEquals(1, mRocksDBManager.getDevicesNum(new PartialPath("root.tt.sg.dd"), true));
+        0, mRocksDBManager.getDevicesNum(new PartialPath("root.inner1.inner2.**"), false));
+    Assert.assertEquals(2, mRocksDBManager.getDevicesNum(new PartialPath("root.tt.sg.**"), false));
+    Assert.assertEquals(1, mRocksDBManager.getDevicesNum(new PartialPath("root.tt.sg.dd"), false));
 
     // todo wildcard
 

[iotdb] 36/45: fix bug of creating aligned timeseries

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a068a418720cea57a68ad5d4faaf17ec964f9cd6
Author: lisijia <li...@360.cn>
AuthorDate: Mon Mar 14 12:05:36 2022 +0800

    fix bug of creating aligned timeseries
---
 .../apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java    | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 1e3d866..bf6ccfa 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -122,7 +122,15 @@ import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ALL_NODE_TYPE_ARRAY;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_NODE_VALUE;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
 /**
@@ -2251,7 +2259,7 @@ public class MRocksDBManager implements IMetaManager {
 
     // check insert non-aligned InsertPlan for aligned timeseries
     if (deviceMNode.isEntity()) {
-      if (plan.isAligned() && deviceMNode.getAsEntityMNode().isAligned()) {
+      if (plan.isAligned() && !deviceMNode.getAsEntityMNode().isAligned()) {
         throw new MetadataException(
             String.format(
                 "Timeseries under path [%s] is not aligned , please set InsertPlan.isAligned() = false",

[iotdb] 16/45: incorrect response to set storage group

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 769ce5a2fa5f417829b78d22adae5d47a2074f1c
Author: lisijia <li...@360.cn>
AuthorDate: Wed Mar 9 16:02:02 2022 +0800

    incorrect response to set storage group
---
 .../java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 7b0b5f1..7f4f0a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -812,8 +812,9 @@ public class MRocksDBManager implements IMetaManager {
                 readWriteHandler.createNode(
                     levelKey, RocksDBMNodeType.STORAGE_GROUP, DEFAULT_NODE_VALUE);
               }
-            } else if (keyCheckResult.getResult(RocksDBMNodeType.STORAGE_GROUP)) {
-              throw new StorageGroupAlreadySetException(storageGroup.toString());
+            } else {
+              boolean hasChild = !keyCheckResult.getResult(RocksDBMNodeType.STORAGE_GROUP);
+              throw new StorageGroupAlreadySetException(storageGroup.toString(), hasChild);
             }
           } finally {
             lock.unlock();

[iotdb] 37/45: fix npe when query aligned timeseries

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 25421c3aed3859c4d340e89b109dd758ff368115
Author: lisijia <li...@360.cn>
AuthorDate: Mon Mar 14 18:24:36 2022 +0800

    fix npe when query aligned timeseries
---
 .../iotdb/db/metadata/rocksdb/RocksDBUtils.java    | 43 ++++++++++++++++------
 .../iotdb/db/metadata/rocksdb/mnode/RMNode.java    |  4 +-
 .../metadata/rocksdb/mnode/RMeasurementMNode.java  |  2 +-
 3 files changed, 35 insertions(+), 14 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index 392ba35..616a0fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -43,9 +43,34 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_ROOT;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ALIAS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ATTRIBUTES;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ORIGIN_KEY;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_TAGS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_TTL;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_VERSION;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_FLAG;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ESCAPE_PATH_SEPARATOR;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_ALIAS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_ATTRIBUTES;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_SCHEMA;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_TAGS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ALIAS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_INTERNAL;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.PATH_SEPARATOR;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ROOT;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ROOT_CHAR;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ROOT_STRING;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
 
 public class RocksDBUtils {
 
@@ -339,17 +364,13 @@ public class RocksDBUtils {
    * @return inner name
    */
   public static String convertPartialPathToInner(String partialPath, int level, char nodeType) {
-    char lastChar = START_FLAG;
-    StringBuilder stringBuilder = new StringBuilder();
-    for (char c : partialPath.toCharArray()) {
-      if (START_FLAG == lastChar) {
-        stringBuilder.append(nodeType);
-      }
-      if (SPLIT_FLAG == lastChar) {
+    StringBuilder stringBuilder = new StringBuilder(nodeType + ROOT);
+    for (int i = partialPath.indexOf(PATH_SEPARATOR); i < partialPath.length(); i++) {
+      char currentChar = partialPath.charAt(i);
+      stringBuilder.append(partialPath.charAt(i));
+      if (currentChar == SPLIT_FLAG) {
         stringBuilder.append(level);
       }
-      stringBuilder.append(c);
-      lastChar = c;
     }
     return stringBuilder.toString();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMNode.java
index 460072d..872adac 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMNode.java
@@ -88,7 +88,7 @@ public abstract class RMNode implements IMNode {
       String parentInnerName =
           RocksDBUtils.convertPartialPathToInner(keyName, nodeNameMaxLevel, type.getValue());
       try {
-        value = readWriteHandler.get(null, (type + parentInnerName).getBytes());
+        value = readWriteHandler.get(null, parentInnerName.getBytes());
       } catch (RocksDBException e) {
         logger.error("Failed to get parent node.", e);
       }
@@ -101,7 +101,7 @@ public abstract class RMNode implements IMNode {
             node = new RInternalMNode(keyName);
             return node;
           case RockDBConstants.NODE_TYPE_ENTITY:
-            node = new REntityMNode(keyName);
+            node = new REntityMNode(keyName, value);
             return node;
           case RockDBConstants.NODE_TYPE_MEASUREMENT:
             node = new RMeasurementMNode(keyName, value);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMeasurementMNode.java
index e619d3c..32d5348 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMeasurementMNode.java
@@ -72,7 +72,7 @@ public class RMeasurementMNode extends RMNode implements IMeasurementMNode {
   @Override
   public MeasurementPath getMeasurementPath() {
     MeasurementPath result = new MeasurementPath(super.getPartialPath(), schema);
-    //    result.setUnderAlignedEntity(getParent().isAligned());
+    result.setUnderAlignedEntity(getParent().isAligned());
     if (alias != null && !alias.isEmpty()) {
       result.setMeasurementAlias(alias);
     }

[iotdb] 11/45: update the method of getting the schema

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit aeb546b00e3e0e9badc52c83eb9d778bd16221ae
Author: lisijia <li...@360.cn>
AuthorDate: Tue Mar 8 15:52:58 2022 +0800

    update the method of getting the schema
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 93 +++++++++++++++-------
 1 file changed, 63 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index ff41b13..a705ae6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -113,13 +113,23 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
-import java.util.function.Consumer;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_NODE_VALUE;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_SCHEMA;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
 /**
@@ -946,14 +956,17 @@ public class MRocksDBManager implements IMetaManager {
   }
 
   public void traverseOutcomeBasins(
-      String[] nodes, int maxLevel, Consumer<String> consumer, Character[] nodeTypeArray)
+      String[] nodes,
+      int maxLevel,
+      BiFunction<String, byte[], Boolean> function,
+      Character[] nodeTypeArray)
       throws IllegalPathException {
     List<String[]> allNodesArray = RocksDBUtils.replaceMultiWildcardToSingle(nodes, maxLevel);
-    allNodesArray.parallelStream().forEach(x -> traverseByPatternPath(x, consumer, nodeTypeArray));
+    allNodesArray.parallelStream().forEach(x -> traverseByPatternPath(x, function, nodeTypeArray));
   }
 
   public void traverseByPatternPath(
-      String[] nodes, Consumer<String> consumer, Character[] nodeTypeArray) {
+      String[] nodes, BiFunction<String, byte[], Boolean> function, Character[] nodeTypeArray) {
     //    String[] nodes = pathPattern.getNodes();
 
     int startIndex = 0;
@@ -968,12 +981,14 @@ public class MRocksDBManager implements IMetaManager {
                 String levelPrefix =
                     RocksDBUtils.convertPartialPathToInnerByNodes(nodes, nodes.length - 1, x);
                 try {
-                  if (readWriteHandler.keyExist(levelPrefix.getBytes())) {
+                  Holder<byte[]> holder = new Holder<>();
+                  readWriteHandler.keyExist(levelPrefix.getBytes(), holder);
+                  if (holder.getValue() != null) {
                     StringBuilder stringBuilder = new StringBuilder();
                     for (String node : nodes) {
                       stringBuilder.append(RockDBConstants.PATH_SEPARATOR).append(node);
                     }
-                    consumer.accept(stringBuilder.substring(1));
+                    function.apply(stringBuilder.substring(1), holder.getValue());
                   }
                 } catch (RocksDBException e) {
                   logger.error(e.getMessage());
@@ -1018,8 +1033,9 @@ public class MRocksDBManager implements IMetaManager {
                             }
                             if (RocksDBUtils.suffixMatch(iterator.key(), suffixToMatch)) {
                               if (lastIteration) {
-                                consumer.accept(
-                                    RocksDBUtils.getPathByInnerName(new String(iterator.key())));
+                                function.apply(
+                                    RocksDBUtils.getPathByInnerName(new String(iterator.key())),
+                                    iterator.value());
                               } else {
                                 tempNodes.add(RocksDBUtils.toMetaNodes(iterator.key()));
                               }
@@ -1152,9 +1168,12 @@ public class MRocksDBManager implements IMetaManager {
 
   private int getCountByNodeType(Character[] nodetype, String[] nodes) throws IllegalPathException {
     AtomicInteger atomicInteger = new AtomicInteger(0);
-    Consumer<String> consumer = s -> atomicInteger.incrementAndGet();
-
-    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, consumer, nodetype);
+    BiFunction<String, byte[], Boolean> function =
+        (a, b) -> {
+          atomicInteger.incrementAndGet();
+          return true;
+        };
+    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, nodetype);
     return atomicInteger.get();
   }
 
@@ -1408,9 +1427,12 @@ public class MRocksDBManager implements IMetaManager {
       String[] nodes, Character[] nodetype, Collection<PartialPath> collection)
       throws IllegalPathException {
     List<String> allResult = Collections.synchronizedList(new ArrayList<>());
-    Consumer<String> consumer = allResult::add;
-
-    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, consumer, nodetype);
+    BiFunction<String, byte[], Boolean> function =
+        (a, b) -> {
+          allResult.add(a);
+          return true;
+        };
+    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, nodetype);
 
     for (String path : allResult) {
       collection.add(new PartialPath(path));
@@ -1525,20 +1547,7 @@ public class MRocksDBManager implements IMetaManager {
   @Override
   public List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
-    List<MeasurementPath> result = new ArrayList<>();
-    Map<String, byte[]> allMeasurement =
-        getKeyNumByPrefix(pathPattern, NODE_TYPE_MEASUREMENT, isPrefixMatch);
-    for (Entry<String, byte[]> entry : allMeasurement.entrySet()) {
-      try {
-        MeasurementSchema schema =
-            (MeasurementSchema) RocksDBUtils.parseNodeValue(entry.getValue(), FLAG_IS_SCHEMA);
-        PartialPath path = new PartialPath(RocksDBUtils.getPathByInnerName(entry.getKey()));
-        result.add(new MeasurementPath(path, schema));
-      } catch (ClassCastException e) {
-        throw new MetadataException(e);
-      }
-    }
-    return result;
+    return getMatchedMeasurementPath(pathPattern.getNodes());
   }
 
   /**
@@ -1565,7 +1574,31 @@ public class MRocksDBManager implements IMetaManager {
   public Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
       PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
       throws MetadataException {
-    return null;
+    // todo update offset
+    Pair<List<MeasurementPath>, Integer> result =
+        new Pair<>(getMatchedMeasurementPath(pathPattern.getNodes()), offset + limit);
+    return result;
+  }
+
+  private List<MeasurementPath> getMatchedMeasurementPath(String[] nodes)
+      throws IllegalPathException {
+    List<MeasurementPath> allResult = Collections.synchronizedList(new ArrayList<>());
+    BiFunction<String, byte[], Boolean> function =
+        (a, b) -> {
+          try {
+            allResult.add(
+                new MeasurementPath(
+                    new PartialPath(a),
+                    (MeasurementSchema) RocksDBUtils.parseNodeValue(b, FLAG_IS_SCHEMA)));
+            return true;
+          } catch (IllegalPathException e) {
+            logger.error(e.getMessage());
+            return false;
+          }
+        };
+    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
+
+    return allResult;
   }
 
   @Override

[iotdb] 09/45: spotless apply

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f15a6d1902c9e7e6d994738d999c53526065c660
Author: chengjianyun <ch...@360.cn>
AuthorDate: Fri Mar 4 19:00:29 2022 +0800

    spotless apply
---
 .../org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java   | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java
index 1d37cf8..4aca957 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java
@@ -7,15 +7,16 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import static org.apache.iotdb.db.metadata.rocksdb.RocksDBReadWriteHandler.ROCKSDB_PATH;
 
 public class MRocksDBUnitTest {

[iotdb] 33/45: [RocksDB] code refine

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e1fb08fea92f7a3ae51c4717edf3a033e4b7f0bd
Author: chengjianyun <ch...@360.cn>
AuthorDate: Fri Mar 11 19:11:36 2022 +0800

    [RocksDB] code refine
---
 .../org/apache/iotdb/db/metadata/IMetaManager.java |   3 -
 .../org/apache/iotdb/db/metadata/MManager.java     |   5 -
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 132 +--------------------
 .../iotdb/db/metadata/rocksdb/RMNodeValueType.java |  18 +++
 .../iotdb/db/metadata/rocksdb/RockDBConstants.java |   2 +-
 .../iotdb/db/metadata/rocksdb/RocksDBUtils.java    |  78 ++++++------
 .../metadata/rocksdb/mnode/RStorageGroupMNode.java |   4 +-
 7 files changed, 67 insertions(+), 175 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/IMetaManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/IMetaManager.java
index fcc12da..37a0823 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/IMetaManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/IMetaManager.java
@@ -247,9 +247,6 @@ public interface IMetaManager {
   List<MeasurementPath> getAllMeasurementByDevicePath(PartialPath devicePath)
       throws PathNotExistException;
 
-  Map<PartialPath, IMeasurementSchema> getAllMeasurementSchemaByPrefix(PartialPath prefixPath)
-      throws MetadataException;
-
   IStorageGroupMNode getStorageGroupNodeByStorageGroupPath(PartialPath path)
       throws MetadataException;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 8efe84e..60fdea1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -1510,11 +1510,6 @@ public class MManager implements IMetaManager {
     return new ArrayList<>(res);
   }
 
-  @Override
-  public Map<PartialPath, IMeasurementSchema> getAllMeasurementSchemaByPrefix(
-      PartialPath prefixPath) throws MetadataException {
-    return mtree.getAllMeasurementSchemaByPrefix(prefixPath);
-  }
   // endregion
   // endregion
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 80c9084..ceb8f40 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -118,7 +118,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiFunction;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
@@ -1116,84 +1115,6 @@ public class MRocksDBManager implements IMetaManager {
     return index;
   }
 
-  private Map<String, byte[]> getKeyNumByPrefix(
-      PartialPath pathPattern, char nodeType, boolean isPrefixMatch) {
-    Map<String, byte[]> result = new ConcurrentHashMap<>();
-    Set<String> seeds = new HashSet<>();
-
-    String seedPath;
-
-    int nonWildcardAvailablePosition =
-        pathPattern.getFullPath().indexOf(ONE_LEVEL_PATH_WILDCARD) - 1;
-    if (nonWildcardAvailablePosition < 0) {
-      seedPath = RocksDBUtils.getLevelPath(pathPattern.getNodes(), pathPattern.getNodeLength() - 1);
-    } else {
-      seedPath = pathPattern.getFullPath().substring(0, nonWildcardAvailablePosition);
-    }
-
-    seeds.add(new String(RocksDBUtils.toRocksDBKey(seedPath, nodeType)));
-
-    scanAllKeysRecursively(
-        seeds,
-        0,
-        s -> {
-          try {
-            byte[] value = readWriteHandler.get(null, s.getBytes());
-            if (value != null && value.length > 0 && s.charAt(0) == nodeType) {
-              if (!isPrefixMatch || isMatched(pathPattern, s)) {
-                result.put(s, value);
-                return false;
-              }
-            }
-          } catch (RocksDBException e) {
-            return false;
-          }
-          return true;
-        },
-        isPrefixMatch);
-    return result;
-  }
-
-  // eg. pathPatter:root.a.b     prefixedKey=sroot.2a.2bbb
-  private boolean isMatched(PartialPath pathPattern, String prefixedKey) {
-    // path = root.a.bbb
-    String path = RocksDBUtils.getPathByInnerName(prefixedKey);
-    if (path.length() <= pathPattern.getFullPath().length()) {
-      return true;
-    } else {
-      String fullPath = pathPattern.getFullPath() + RockDBConstants.PATH_SEPARATOR;
-      return path.startsWith(fullPath);
-    }
-  }
-
-  private void scanAllKeysRecursively(
-      Set<String> seeds, int level, Function<String, Boolean> op, boolean isPrefixMatch) {
-    if (seeds == null || seeds.isEmpty()) {
-      return;
-    }
-    Set<String> children = ConcurrentHashMap.newKeySet();
-    seeds
-        .parallelStream()
-        .forEach(
-            x -> {
-              if (op.apply(x)) {
-                if (isPrefixMatch) {
-                  for (int i = level; i < MAX_PATH_DEPTH; i++) {
-                    // x is not leaf node
-                    String nextLevel = RocksDBUtils.getNextLevelOfPath(x, i);
-                    children.addAll(readWriteHandler.getAllByPrefix(nextLevel));
-                  }
-                } else {
-                  String nextLevel = RocksDBUtils.getNextLevelOfPath(x, level);
-                  children.addAll(readWriteHandler.getAllByPrefix(nextLevel));
-                }
-              }
-            });
-    if (!children.isEmpty()) {
-      scanAllKeysRecursively(children, level + 1, op, isPrefixMatch);
-    }
-  }
-
   @Override
   public int getAllTimeseriesCount(PartialPath pathPattern) throws MetadataException {
     return getAllTimeseriesCount(pathPattern, false);
@@ -1550,21 +1471,6 @@ public class MRocksDBManager implements IMetaManager {
     return allPath;
   }
 
-  private Set<PartialPath> getMatchedPathWithNodeType(
-      boolean isPrefixMatch, PartialPath pathPattern, char nodeType) throws MetadataException {
-    Set<PartialPath> result = new HashSet<>();
-    Map<String, byte[]> allMeasurement = getKeyNumByPrefix(pathPattern, nodeType, isPrefixMatch);
-    for (Entry<String, byte[]> entry : allMeasurement.entrySet()) {
-      try {
-        PartialPath path = new PartialPath(RocksDBUtils.getPathByInnerName(entry.getKey()));
-        result.add(path);
-      } catch (ClassCastException | IllegalPathException e) {
-        throw new MetadataException(e);
-      }
-    }
-    return result;
-  }
-
   /**
    * Get all device paths and according storage group paths as ShowDevicesResult.
    *
@@ -1767,10 +1673,10 @@ public class MRocksDBManager implements IMetaManager {
           RocksDBUtils.getLevelPath(fullPath.getNodes(), fullPath.getNodeLength() - 1);
       Holder<byte[]> holder = new Holder<>();
       if (readWriteHandler.keyExistByType(levelKey, RocksDBMNodeType.MEASUREMENT, holder)) {
-        IMeasurementSchema schema =
+        MeasurementSchema measurementSchema =
             (MeasurementSchema)
-                RocksDBUtils.parseNodeValue(holder.getValue(), DATA_BLOCK_TYPE_SCHEMA);
-        return schema;
+                RocksDBUtils.parseNodeValue(holder.getValue(), RMNodeValueType.SCHEMA);
+        return measurementSchema;
       } else {
         throw new PathNotExistException(fullPath.getFullPath());
       }
@@ -1796,30 +1702,12 @@ public class MRocksDBManager implements IMetaManager {
         throw new PathNotExistException(e.getMessage());
       }
       MeasurementSchema measurementSchema =
-          (MeasurementSchema) RocksDBUtils.parseNodeValue(entry.getValue(), FLAG_IS_SCHEMA);
+          (MeasurementSchema) RocksDBUtils.parseNodeValue(entry.getValue(), RMNodeValueType.SCHEMA);
       result.add(new MeasurementPath(pathName, measurementSchema));
     }
     return result;
   }
 
-  @Override
-  public Map<PartialPath, IMeasurementSchema> getAllMeasurementSchemaByPrefix(
-      PartialPath prefixPath) throws MetadataException {
-    Map<PartialPath, IMeasurementSchema> result = new HashMap<>();
-    Map<String, byte[]> allMeasurement = getKeyNumByPrefix(prefixPath, NODE_TYPE_MEASUREMENT, true);
-    for (Entry<String, byte[]> entry : allMeasurement.entrySet()) {
-      try {
-        MeasurementSchema schema =
-            (MeasurementSchema) RocksDBUtils.parseNodeValue(entry.getValue(), FLAG_IS_SCHEMA);
-        PartialPath path = new PartialPath(RocksDBUtils.getPathByInnerName(entry.getKey()));
-        result.put(path, schema);
-      } catch (ClassCastException e) {
-        throw new MetadataException(e);
-      }
-    }
-    return result;
-  }
-
   /**
    * E.g., root.sg is storage group given [root, sg], return the MNode of root.sg given [root, sg,
    * device], return the MNode of root.sg Get storage group node by path. If storage group is not
@@ -1842,13 +1730,9 @@ public class MRocksDBManager implements IMetaManager {
         String levelPath = RocksDBUtils.getLevelPath(nodes, i);
         Holder<byte[]> holder = new Holder<>();
         if (readWriteHandler.keyExistByType(levelPath, RocksDBMNodeType.STORAGE_GROUP, holder)) {
-          Object ttl = RocksDBUtils.parseNodeValue(holder.getValue(), RockDBConstants.FLAG_SET_TTL);
-          if (ttl == null) {
-            ttl = config.getDefaultTTL();
-          }
           node =
               new RStorageGroupMNode(
-                  MetaUtils.getStorageGroupPathByLevel(path, i).getFullPath(), (Long) ttl);
+                  MetaUtils.getStorageGroupPathByLevel(path, i).getFullPath(), holder.getValue());
           break;
         }
       }
@@ -1873,13 +1757,9 @@ public class MRocksDBManager implements IMetaManager {
       if (iterator.key()[0] != NODE_TYPE_SG) {
         break;
       }
-      Object ttl = RocksDBUtils.parseNodeValue(iterator.value(), FLAG_SET_TTL);
-      if (ttl == null) {
-        ttl = config.getDefaultTTL();
-      }
       result.add(
           new RStorageGroupMNode(
-              RocksDBUtils.getPathByInnerName(new String(iterator.key())), (Long) ttl));
+              RocksDBUtils.getPathByInnerName(new String(iterator.key())), iterator.value()));
     }
     return result;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RMNodeValueType.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RMNodeValueType.java
new file mode 100644
index 0000000..2dee074
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RMNodeValueType.java
@@ -0,0 +1,18 @@
+package org.apache.iotdb.db.metadata.rocksdb;
+
+public enum RMNodeValueType {
+  TTL(RockDBConstants.FLAG_SET_TTL, RockDBConstants.DATA_BLOCK_TYPE_TTL),
+  SCHEMA(RockDBConstants.FLAG_HAS_SCHEMA, RockDBConstants.DATA_BLOCK_TYPE_SCHEMA),
+  ALIAS(RockDBConstants.FLAG_HAS_ALIAS, RockDBConstants.DATA_BLOCK_TYPE_ALIAS),
+  TAGS(RockDBConstants.FLAG_HAS_TAGS, RockDBConstants.DATA_BLOCK_TYPE_TAGS),
+  ATTRIBUTES(RockDBConstants.FLAG_HAS_ATTRIBUTES, RockDBConstants.DATA_BLOCK_TYPE_ATTRIBUTES),
+  ORIGIN_KEY(null, RockDBConstants.DATA_BLOCK_TYPE_ORIGIN_KEY);
+
+  byte type;
+  Byte flag;
+
+  RMNodeValueType(Byte flag, byte type) {
+    this.type = type;
+    this.flag = flag;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RockDBConstants.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RockDBConstants.java
index dfc4df2..be49fd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RockDBConstants.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RockDBConstants.java
@@ -55,7 +55,7 @@ public class RockDBConstants {
   public static final byte DEFAULT_FLAG = 0x00;
 
   public static final byte FLAG_SET_TTL = 0x01;
-  public static final byte FLAG_IS_SCHEMA = 0x01 << 1;
+  public static final byte FLAG_HAS_SCHEMA = 0x01 << 1;
   public static final byte FLAG_HAS_ALIAS = 0x01 << 2;
   public static final byte FLAG_HAS_TAGS = 0x01 << 3;
   public static final byte FLAG_HAS_ATTRIBUTES = 0x01 << 4;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index 38906c0..392ba35 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -44,9 +44,8 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.*;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.PATH_SEPARATOR;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
 
 public class RocksDBUtils {
 
@@ -176,7 +175,7 @@ public class RocksDBUtils {
     }
 
     if (schema != null) {
-      flag = (byte) (flag | FLAG_IS_SCHEMA);
+      flag = (byte) (flag | FLAG_HAS_SCHEMA);
     }
 
     ReadWriteIOUtils.write(flag, outputStream);
@@ -214,16 +213,18 @@ public class RocksDBUtils {
     return ReadWriteIOUtils.readBytes(buffer, len);
   }
 
-  public static int indexOfDataBlockType(byte[] data, byte type) {
-    if ((data[1] & FLAG_SET_TTL) == 0) {
+  public static int indexOfDataBlockType(byte[] data, RMNodeValueType valueType) {
+    if (valueType.flag != null && (data[1] & valueType.flag) == 0) {
       return -1;
     }
 
     int index = -1;
     boolean typeExist = false;
+
     ByteBuffer byteBuffer = ByteBuffer.wrap(data);
-    // skip the version flag and node type flag
+    // skip the data version and filter byte
     ReadWriteIOUtils.readBytes(byteBuffer, 2);
+
     while (byteBuffer.hasRemaining()) {
       byte blockType = ReadWriteIOUtils.readByte(byteBuffer);
       index = byteBuffer.position();
@@ -248,7 +249,7 @@ public class RocksDBUtils {
           break;
       }
       // got the data we need,don't need to read any more
-      if (type == blockType) {
+      if (valueType.type == blockType) {
         typeExist = true;
         break;
       }
@@ -257,7 +258,7 @@ public class RocksDBUtils {
   }
 
   public static byte[] updateTTL(byte[] origin, long ttl) {
-    int index = indexOfDataBlockType(origin, DATA_BLOCK_TYPE_TTL);
+    int index = indexOfDataBlockType(origin, RMNodeValueType.TTL);
     if (index < 1) {
       byte[] ttlBlock = new byte[Long.BYTES + 1];
       ttlBlock[0] = DATA_BLOCK_TYPE_TTL;
@@ -277,44 +278,45 @@ public class RocksDBUtils {
    * parse value and return a specified type. if no data is required, null is returned.
    *
    * @param value value written in default table
-   * @param type the type of value to obtain
+   * @param valueType the type of value to obtain
    */
-  public static Object parseNodeValue(byte[] value, byte type) {
+  public static Object parseNodeValue(byte[] value, RMNodeValueType valueType) {
     ByteBuffer byteBuffer = ByteBuffer.wrap(value);
     // skip the version flag and node type flag
     ReadWriteIOUtils.readByte(byteBuffer);
     // get block type
-    byte flag = ReadWriteIOUtils.readByte(byteBuffer);
-
+    byte filter = ReadWriteIOUtils.readByte(byteBuffer);
     Object obj = null;
+    if (valueType.flag != null && (filter & valueType.flag) == 0) {
+      return obj;
+    }
+
     // this means that the following data contains the information we need
-    if ((flag & type) > 0) {
-      while (byteBuffer.hasRemaining()) {
-        byte blockType = ReadWriteIOUtils.readByte(byteBuffer);
-        switch (blockType) {
-          case DATA_BLOCK_TYPE_TTL:
-            obj = ReadWriteIOUtils.readLong(byteBuffer);
-            break;
-          case DATA_BLOCK_TYPE_ALIAS:
-            obj = ReadWriteIOUtils.readString(byteBuffer);
-            break;
-          case DATA_BLOCK_TYPE_ORIGIN_KEY:
-            obj = readOriginKey(byteBuffer);
-            break;
-          case DATA_BLOCK_TYPE_SCHEMA:
-            obj = MeasurementSchema.deserializeFrom(byteBuffer);
-            break;
-          case DATA_BLOCK_TYPE_TAGS:
-          case DATA_BLOCK_TYPE_ATTRIBUTES:
-            obj = ReadWriteIOUtils.readMap(byteBuffer);
-            break;
-          default:
-            break;
-        }
-        // got the data we need,don't need to read any more
-        if (type == blockType) {
+    while (byteBuffer.hasRemaining()) {
+      byte blockType = ReadWriteIOUtils.readByte(byteBuffer);
+      switch (blockType) {
+        case DATA_BLOCK_TYPE_TTL:
+          obj = ReadWriteIOUtils.readLong(byteBuffer);
           break;
-        }
+        case DATA_BLOCK_TYPE_ALIAS:
+          obj = ReadWriteIOUtils.readString(byteBuffer);
+          break;
+        case DATA_BLOCK_TYPE_ORIGIN_KEY:
+          obj = readOriginKey(byteBuffer);
+          break;
+        case DATA_BLOCK_TYPE_SCHEMA:
+          obj = MeasurementSchema.deserializeFrom(byteBuffer);
+          break;
+        case DATA_BLOCK_TYPE_TAGS:
+        case DATA_BLOCK_TYPE_ATTRIBUTES:
+          obj = ReadWriteIOUtils.readMap(byteBuffer);
+          break;
+        default:
+          break;
+      }
+      // got the data we need,don't need to read any more
+      if (valueType.type == blockType) {
+        break;
       }
     }
     return obj;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RStorageGroupMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RStorageGroupMNode.java
index da22224..198ca29 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RStorageGroupMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RStorageGroupMNode.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.metadata.rocksdb.mnode;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.metadata.logfile.MLogWriter;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
-import org.apache.iotdb.db.metadata.rocksdb.RockDBConstants;
+import org.apache.iotdb.db.metadata.rocksdb.RMNodeValueType;
 import org.apache.iotdb.db.metadata.rocksdb.RocksDBUtils;
 
 import java.io.IOException;
@@ -42,7 +42,7 @@ public class RStorageGroupMNode extends RInternalMNode implements IStorageGroupM
 
   public RStorageGroupMNode(String fullPath, byte[] value) {
     super(fullPath);
-    Object ttl = RocksDBUtils.parseNodeValue(value, RockDBConstants.FLAG_SET_TTL);
+    Object ttl = RocksDBUtils.parseNodeValue(value, RMNodeValueType.TTL);
     if (ttl != null) {
       ttl = IoTDBDescriptor.getInstance().getConfig().getDefaultTTL();
     }

[iotdb] 13/45: update the way of getting measurementPath

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a809ae3b4c9bd1a3260e533b9572056b74825889
Author: lisijia <li...@360.cn>
AuthorDate: Wed Mar 9 11:13:06 2022 +0800

    update the way of getting measurementPath
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java       | 16 +++-------------
 .../db/metadata/rocksdb/mnode/RMeasurementMNode.java     |  3 +++
 2 files changed, 6 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 47b8dbb..f8a0f41 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -1576,9 +1576,7 @@ public class MRocksDBManager implements IMetaManager {
       PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
       throws MetadataException {
     // todo update offset
-    Pair<List<MeasurementPath>, Integer> result =
-        new Pair<>(getMatchedMeasurementPath(pathPattern.getNodes()), offset + limit);
-    return result;
+    return new Pair<>(getMatchedMeasurementPath(pathPattern.getNodes()), offset + limit);
   }
 
   private List<MeasurementPath> getMatchedMeasurementPath(String[] nodes)
@@ -1586,16 +1584,8 @@ public class MRocksDBManager implements IMetaManager {
     List<MeasurementPath> allResult = Collections.synchronizedList(new ArrayList<>());
     BiFunction<String, byte[], Boolean> function =
         (a, b) -> {
-          try {
-            allResult.add(
-                new MeasurementPath(
-                    new PartialPath(a),
-                    (MeasurementSchema) RocksDBUtils.parseNodeValue(b, FLAG_IS_SCHEMA)));
-            return true;
-          } catch (IllegalPathException e) {
-            logger.error(e.getMessage());
-            return false;
-          }
+          allResult.add(new RMeasurementMNode(a, b).getMeasurementPath());
+          return true;
         };
     traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMeasurementMNode.java
index 2b9f988..95ffdde 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMeasurementMNode.java
@@ -73,6 +73,9 @@ public class RMeasurementMNode extends RMNode implements IMeasurementMNode {
   public MeasurementPath getMeasurementPath() {
     MeasurementPath result = new MeasurementPath(super.getPartialPath(), schema);
     result.setUnderAlignedEntity(getParent().isAligned());
+    if (alias != null && !alias.isEmpty()) {
+      result.setMeasurementAlias(alias);
+    }
     return result;
   }
 

[iotdb] 24/45: fix npe when getBelongedToSG

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a543966f966cc53f42c280579e9590f0ae9a45f1
Author: lisijia <li...@360.cn>
AuthorDate: Thu Mar 10 14:28:58 2022 +0800

    fix npe when getBelongedToSG
---
 .../iotdb/db/metadata/rocksdb/GetBelongedToSpecifiedType.java       | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/GetBelongedToSpecifiedType.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/GetBelongedToSpecifiedType.java
index 3100904..61535ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/GetBelongedToSpecifiedType.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/GetBelongedToSpecifiedType.java
@@ -22,12 +22,12 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.utils.MetaUtils;
 
-import org.rocksdb.RocksDBException;
-
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
+import org.rocksdb.RocksDBException;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
@@ -38,7 +38,7 @@ public class GetBelongedToSpecifiedType {
   private String[] nodes;
   private RocksDBReadWriteHandler readWriteHandler;
   protected List<String> contextNodes = new ArrayList<>();
-  private Set<PartialPath> allResult;
+  private Set<PartialPath> allResult = new HashSet<>();
   private char nodeType;
 
   public GetBelongedToSpecifiedType(

[iotdb] 14/45: change the parameter type in function

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4fa0f53da9f02958bffdcc97e8b29cce63c97ccf
Author: lisijia <li...@360.cn>
AuthorDate: Wed Mar 9 12:19:29 2022 +0800

    change the parameter type in function
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 26 +++++++++-------------
 .../metadata/rocksdb/mnode/RMeasurementMNode.java  |  4 ++--
 2 files changed, 13 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index f8a0f41..a1f5aa9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -959,7 +959,7 @@ public class MRocksDBManager implements IMetaManager {
   public void traverseOutcomeBasins(
       String[] nodes,
       int maxLevel,
-      BiFunction<String, byte[], Boolean> function,
+      BiFunction<byte[], byte[], Boolean> function,
       Character[] nodeTypeArray)
       throws IllegalPathException {
     List<String[]> allNodesArray = RocksDBUtils.replaceMultiWildcardToSingle(nodes, maxLevel);
@@ -967,7 +967,7 @@ public class MRocksDBManager implements IMetaManager {
   }
 
   public void traverseByPatternPath(
-      String[] nodes, BiFunction<String, byte[], Boolean> function, Character[] nodeTypeArray) {
+      String[] nodes, BiFunction<byte[], byte[], Boolean> function, Character[] nodeTypeArray) {
     //    String[] nodes = pathPattern.getNodes();
 
     int startIndex = 0;
@@ -985,11 +985,7 @@ public class MRocksDBManager implements IMetaManager {
                   Holder<byte[]> holder = new Holder<>();
                   readWriteHandler.keyExist(levelPrefix.getBytes(), holder);
                   if (holder.getValue() != null) {
-                    StringBuilder stringBuilder = new StringBuilder();
-                    for (String node : nodes) {
-                      stringBuilder.append(RockDBConstants.PATH_SEPARATOR).append(node);
-                    }
-                    function.apply(stringBuilder.substring(1), holder.getValue());
+                    function.apply(levelPrefix.getBytes(), holder.getValue());
                   }
                 } catch (RocksDBException e) {
                   logger.error(e.getMessage());
@@ -1034,9 +1030,7 @@ public class MRocksDBManager implements IMetaManager {
                             }
                             if (RocksDBUtils.suffixMatch(iterator.key(), suffixToMatch)) {
                               if (lastIteration) {
-                                function.apply(
-                                    RocksDBUtils.getPathByInnerName(new String(iterator.key())),
-                                    iterator.value());
+                                function.apply(iterator.key(), iterator.value());
                               } else {
                                 tempNodes.add(RocksDBUtils.toMetaNodes(iterator.key()));
                               }
@@ -1169,7 +1163,7 @@ public class MRocksDBManager implements IMetaManager {
 
   private int getCountByNodeType(Character[] nodetype, String[] nodes) throws IllegalPathException {
     AtomicInteger atomicInteger = new AtomicInteger(0);
-    BiFunction<String, byte[], Boolean> function =
+    BiFunction<byte[], byte[], Boolean> function =
         (a, b) -> {
           atomicInteger.incrementAndGet();
           return true;
@@ -1428,9 +1422,9 @@ public class MRocksDBManager implements IMetaManager {
       String[] nodes, Character[] nodetype, Collection<PartialPath> collection)
       throws IllegalPathException {
     List<String> allResult = Collections.synchronizedList(new ArrayList<>());
-    BiFunction<String, byte[], Boolean> function =
+    BiFunction<byte[], byte[], Boolean> function =
         (a, b) -> {
-          allResult.add(a);
+          allResult.add(RocksDBUtils.getPathByInnerName(new String(a)));
           return true;
         };
     traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, nodetype);
@@ -1582,9 +1576,11 @@ public class MRocksDBManager implements IMetaManager {
   private List<MeasurementPath> getMatchedMeasurementPath(String[] nodes)
       throws IllegalPathException {
     List<MeasurementPath> allResult = Collections.synchronizedList(new ArrayList<>());
-    BiFunction<String, byte[], Boolean> function =
+    BiFunction<byte[], byte[], Boolean> function =
         (a, b) -> {
-          allResult.add(new RMeasurementMNode(a, b).getMeasurementPath());
+          allResult.add(
+              new RMeasurementMNode(RocksDBUtils.getPathByInnerName(new String(a)), b)
+                  .getMeasurementPath());
           return true;
         };
     traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMeasurementMNode.java
index 95ffdde..e619d3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/mnode/RMeasurementMNode.java
@@ -63,7 +63,7 @@ public class RMeasurementMNode extends RMNode implements IMeasurementMNode {
 
   @Override
   public IEntityMNode getParent() {
-    if (parent == null) {
+    if (super.getParent() == null) {
       return null;
     }
     return parent.getAsEntityMNode();
@@ -72,7 +72,7 @@ public class RMeasurementMNode extends RMNode implements IMeasurementMNode {
   @Override
   public MeasurementPath getMeasurementPath() {
     MeasurementPath result = new MeasurementPath(super.getPartialPath(), schema);
-    result.setUnderAlignedEntity(getParent().isAligned());
+    //    result.setUnderAlignedEntity(getParent().isAligned());
     if (alias != null && !alias.isEmpty()) {
       result.setMeasurementAlias(alias);
     }

[iotdb] 17/45: update the exception information in the log

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 535dd2cb3208f365113409f876fe7ad06d48735d
Author: lisijia <li...@360.cn>
AuthorDate: Wed Mar 9 16:26:32 2022 +0800

    update the exception information in the log
---
 .../java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java  | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 7f4f0a0..8903cca 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -814,7 +814,11 @@ public class MRocksDBManager implements IMetaManager {
               }
             } else {
               boolean hasChild = !keyCheckResult.getResult(RocksDBMNodeType.STORAGE_GROUP);
-              throw new StorageGroupAlreadySetException(storageGroup.toString(), hasChild);
+              StringBuilder stringBuilder = new StringBuilder();
+              for (int j = 0; j <= i; j++) {
+                stringBuilder.append(RockDBConstants.PATH_SEPARATOR).append(nodes[j]);
+              }
+              throw new StorageGroupAlreadySetException(stringBuilder.substring(1), hasChild);
             }
           } finally {
             lock.unlock();

[iotdb] 21/45: [rocksdb] fix the bug that upsert alias fail, refine delete timeseries logic

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 40d09a487cc2cbee05b0e02d248d578a11ccc5cb
Author: chengjianyun <ch...@360.cn>
AuthorDate: Wed Mar 9 19:29:00 2022 +0800

    [rocksdb] fix the bug that upsert alias fail, refine delete timeseries logic
---
 .../iotdb/db/metadata/rocksdb/CheckKeyResult.java  |  27 +--
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 192 +++++++++++++--------
 .../metadata/rocksdb/RocksDBReadWriteHandler.java  |  11 +-
 .../iotdb/db/metadata/rocksdb/RocksDBUtils.java    |  13 ++
 .../db/metadata/rocksdb/MRocksDBUnitTest.java      |  78 ++++++++-
 5 files changed, 231 insertions(+), 90 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/CheckKeyResult.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/CheckKeyResult.java
index 25f9ab1..69c7cd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/CheckKeyResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/CheckKeyResult.java
@@ -19,23 +19,15 @@
 
 package org.apache.iotdb.db.metadata.rocksdb;
 
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.MAX_NODE_TYPE_NUM;
+import static org.apache.iotdb.db.metadata.rocksdb.RocksDBUtils.NODE_TYPE_ARRAY;
 
 public class CheckKeyResult {
 
-  private boolean[] result = new boolean[MAX_NODE_TYPE_NUM];
-  private boolean existAnyKey = false;
   private byte[] value;
-
-  public void setSingleCheckValue(char index, boolean value) {
-    if (value) {
-      existAnyKey = true;
-    }
-    result[index] = value;
-  }
+  private RocksDBMNodeType nodeType;
 
   public boolean existAnyKey() {
-    return existAnyKey;
+    return nodeType != null;
   }
 
   public byte[] getValue() {
@@ -46,7 +38,18 @@ public class CheckKeyResult {
     this.value = value;
   }
 
+  public void setExistType(char type) {
+    nodeType = NODE_TYPE_ARRAY[type];
+  }
+
+  public RocksDBMNodeType getExistType() {
+    return nodeType;
+  }
+
   public boolean getResult(RocksDBMNodeType type) {
-    return result[type.value];
+    if (type == nodeType) {
+      return true;
+    }
+    return false;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 3c8523f..671fcbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -80,6 +80,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
@@ -96,6 +97,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -630,7 +632,8 @@ public class MRocksDBManager implements IMetaManager {
   @Override
   public String deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
-    Set<String> failedNames = new HashSet<>();
+    Set<String> failedNames = ConcurrentHashMap.newKeySet();
+    //    Set<String> parentToCheck = ConcurrentHashMap.newKeySet();
     traverseOutcomeBasins(
         pathPattern.getNodes(),
         MAX_PATH_DEPTH,
@@ -638,13 +641,11 @@ public class MRocksDBManager implements IMetaManager {
           String path = null;
           RMeasurementMNode deletedNode = null;
           try {
-            path = RocksDBUtils.getPathByInnerName(key);
-            PartialPath partialPath = new PartialPath(path);
-            String levelPath =
-                RocksDBUtils.getLevelPath(partialPath.getNodes(), partialPath.getNodeLength() - 1);
+            path = RocksDBUtils.getPathByInnerName(new String(key));
+            String[] nodes = MetaUtils.splitPathToDetachedPath(path);
+            String levelPath = RocksDBUtils.getLevelPath(nodes, nodes.length - 1);
             // Delete measurement node
-            Lock lock = locksPool.get(levelPath);
-        Lock lock = locksPool.computeIfAbsent(mLevelPath, x -> new ReentrantLock());
+            Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
             if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
               try {
                 deletedNode = new RMeasurementMNode(path, value);
@@ -652,9 +653,8 @@ public class MRocksDBManager implements IMetaManager {
                 // delete the last node of path
                 batch.delete(key);
                 if (deletedNode.getAlias() != null) {
-                  String[] aliasNodes =
-                      Arrays.copyOf(partialPath.getNodes(), partialPath.getNodeLength());
-                  aliasNodes[partialPath.getNodeLength() - 1] = deletedNode.getAlias();
+                  String[] aliasNodes = Arrays.copyOf(nodes, nodes.length);
+                  aliasNodes[nodes.length - 1] = deletedNode.getAlias();
                   String aliasLevelPath =
                       RocksDBUtils.getLevelPath(aliasNodes, aliasNodes.length - 1);
                   batch.delete(RocksDBUtils.toAliasNodeKey(aliasLevelPath));
@@ -670,53 +670,79 @@ public class MRocksDBManager implements IMetaManager {
             } else {
               throw new AcquireLockTimeoutException("acquire lock timeout, " + path);
             }
+
+            //            if (nodes.length > 1) {
+            //              // Only try to delete directly parent if no other siblings
+            //              parentToCheck.add(
+            //                  String.join(PATH_SEPARATOR, Arrays.copyOf(nodes, nodes.length -
+            // 1)));
+            //            }
+          } catch (IllegalPathException e) {
           } catch (Exception e) {
             logger.error("delete timeseries [{}] fail", path, e);
             failedNames.add(path);
-          }
-
-          // delete parent node if is empty
-          IMNode parentNode = deletedNode.getParent();
-          Lock curLock = locksPool.computeIfAbsent(curLevelPath, x -> new ReentrantLock());
-          try {
-            while (parentNode != null) {
-              // TODO: check children size
-              if (!parentNode.isEmptyInternal() || parentNode.isStorageGroup()) {
-                break;
-              } else {
-                PartialPath parentPath = parentNode.getPartialPath();
-                String parentLevelPath =
-                    RocksDBUtils.getLevelPath(
-                        parentPath.getNodes(), parentPath.getNodeLength() - 1);
-                Lock curLock = locksPool.get(parentLevelPath);
-                if (curLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
-                  try {
-                    if (parentNode.isEntity()) {
-                      // TODO: aligned timeseries needs special check????
-                      readWriteHandler.deleteNode(
-                          parentNode.getPartialPath().getNodes(), RocksDBMNodeType.ENTITY);
-                    } else {
-                      readWriteHandler.deleteNode(
-                          parentNode.getPartialPath().getNodes(), RocksDBMNodeType.INTERNAL);
-                    }
-                    parentNode = parentNode.getParent();
-                  } finally {
-                    curLock.unlock();
-                  }
-                } else {
-                  throw new AcquireLockTimeoutException("acquire lock timeout, " + parentLevelPath);
-                }
-              }
-            }
-            // TODO: trigger engine update
-            // TODO: update totalTimeSeriesNumber
-          } catch (Exception e) {
-            logger.error("delete timeseries [{}] fail", parentNode.getFullPath(), e);
-            failedNames.add(parentNode.getFullPath());
+            return false;
           }
           return true;
         },
         new Character[] {NODE_TYPE_MEASUREMENT});
+
+    // TODO: do we need to delete parent??
+    //    parentToCheck
+    //        .parallelStream()
+    //        .forEach(
+    //            x -> {
+    //              try {
+    //                String[] parentNodes = MetaUtils.splitPathToDetachedPath(x);
+    //                String levelPathPrefix =
+    //                    RocksDBUtils.getLevelPathPrefix(
+    //                        parentNodes, parentNodes.length - 1, parentNodes.length);
+    //                AtomicInteger childrenCnt = new AtomicInteger(0);
+    //
+    //                Arrays.stream(ALL_NODE_TYPE_ARRAY)
+    //                    .parallel()
+    //                    .forEach(
+    //                        type -> {
+    //                          byte[] prefixKey = RocksDBUtils.toRocksDBKey(levelPathPrefix, type);
+    //                          readWriteHandler.traverseByPrefix(
+    //                              prefixKey, (bytes, bytes2) -> childrenCnt.getAndIncrement());
+    //                        });
+    //
+    //                if (childrenCnt.get() <= 0) {
+    //                  String parentLevelPath =
+    //                      RocksDBUtils.getLevelPath(parentNodes, parentNodes.length - 1);
+    //                  Lock curLock = locksPool.get(parentLevelPath);
+    //                  if (curLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+    //                    try {
+    //                      CheckKeyResult result =
+    // readWriteHandler.keyExistByAllTypes(parentLevelPath);
+    //                      if (result.getResult(RocksDBMNodeType.ENTITY)) {
+    //                        System.out.println("delete entity node: {}" + parentLevelPath);
+    //                        byte[] key = RocksDBUtils.toEntityNodeKey(parentLevelPath);
+    //                        readWriteHandler.deleteByKey(key);
+    //                      } else if (result.getResult(RocksDBMNodeType.INTERNAL)) {
+    //                        System.out.println("delete internal node: {}" + parentLevelPath);
+    //                        byte[] key = RocksDBUtils.toInternalNodeKey(parentLevelPath);
+    //                        readWriteHandler.deleteByKey(key);
+    //                      } else {
+    //                        System.out.println(
+    //                            "delete type node: {}" + result.getExistType() + " " +
+    // parentLevelPath);
+    //                      }
+    //                    } finally {
+    //                      curLock.unlock();
+    //                    }
+    //                  } else {
+    //                    throw new AcquireLockTimeoutException(
+    //                        "acquire lock timeout, " + parentLevelPath);
+    //                  }
+    //                }
+    //              } catch (IllegalPathException e) {
+    //              } catch (Exception e) {
+    //                logger.error("delete parent of timeseries fail", e);
+    //              }
+    //            });
+
     return failedNames.isEmpty() ? null : String.join(",", failedNames);
   }
 
@@ -1878,6 +1904,14 @@ public class MRocksDBManager implements IMetaManager {
       Holder<byte[]> holder = new Holder<>();
       if (readWriteHandler.keyExistByType(key, RocksDBMNodeType.MEASUREMENT, holder)) {
         node = new RMeasurementMNode(fullPath.getFullPath(), holder.getValue());
+      } else if (readWriteHandler.keyExistByType(key, RocksDBMNodeType.ALISA, holder)) {
+        byte[] aliasValue = holder.getValue();
+        if (aliasValue != null) {
+          ByteBuffer byteBuffer = ByteBuffer.wrap(aliasValue);
+          ReadWriteIOUtils.readBytes(byteBuffer, 3);
+          byte[] oriKey = RocksDBUtils.readOriginKey(byteBuffer);
+          node = new RMeasurementMNode(fullPath.getFullPath(), readWriteHandler.get(null, oriKey));
+        }
       }
       return node;
     } catch (RocksDBException e) {
@@ -1888,8 +1922,8 @@ public class MRocksDBManager implements IMetaManager {
 
   // region Interfaces for alias and tag/attribute operations
   @Override
-  public void changeAlias(PartialPath path, String alias) throws MetadataException, IOException {
-    upsertTagsAndAttributes(alias, null, null, path);
+  public void changeAlias(PartialPath path, String newAlias) throws MetadataException, IOException {
+    upsertTagsAndAttributes(newAlias, null, null, path);
   }
 
   @Override
@@ -1905,10 +1939,15 @@ public class MRocksDBManager implements IMetaManager {
       Lock rawKeyLock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
       if (rawKeyLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
         try {
+          boolean hasUpdate = false;
           String[] nodes = path.getNodes();
           RMeasurementMNode mNode = (RMeasurementMNode) getMeasurementMNode(path);
           // upsert alias
-          if (StringUtils.isEmpty(mNode.getAlias()) || !mNode.getAlias().equals(alias)) {
+          if (StringUtils.isNotEmpty(alias)
+              && (StringUtils.isEmpty(mNode.getAlias()) || !mNode.getAlias().equals(alias))) {
+            String oldAliasStr = mNode.getAlias();
+            mNode.setAlias(alias);
+            hasUpdate = true;
             WriteBatch batch = new WriteBatch();
             String[] newAlias = Arrays.copyOf(nodes, nodes.length);
             newAlias[nodes.length - 1] = alias;
@@ -1918,39 +1957,43 @@ public class MRocksDBManager implements IMetaManager {
             Lock newAliasLock = locksPool.computeIfAbsent(newAliasLevel, x -> new ReentrantLock());
             Lock oldAliasLock = null;
             boolean lockedOldAlias = false;
+            boolean lockedNewAlias = false;
             try {
               if (newAliasLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+                lockedNewAlias = true;
                 if (readWriteHandler.keyExistByAllTypes(newAliasLevel).existAnyKey()) {
                   throw new PathAlreadyExistException("Alias node has exist: " + newAliasLevel);
                 }
                 batch.put(newAliasKey, RocksDBUtils.buildAliasNodeValue(originKey));
-              } else {
-                throw new AcquireLockTimeoutException("acquire lock timeout: " + newAliasLevel);
-              }
-
-              if (StringUtils.isNotEmpty(mNode.getAlias()) && !mNode.getAlias().equals(alias)) {
-                String[] oldAlias = Arrays.copyOf(nodes, nodes.length);
-                oldAlias[nodes.length - 1] = mNode.getAlias();
-                String oldAliasLevel = RocksDBUtils.getLevelPath(oldAlias, oldAlias.length - 1);
-                byte[] oldAliasKey = RocksDBUtils.toAliasNodeKey(oldAliasLevel);
-                oldAliasLock = locksPool.computeIfAbsent(oldAliasLevel, x -> new ReentrantLock());
-                if (oldAliasLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
-                  lockedOldAlias = true;
-                  if (!readWriteHandler.keyExist(oldAliasKey)) {
-                    logger.error(
-                        "origin node [{}] has alias but alias node [{}] doesn't exist ",
-                        levelPath,
-                        oldAliasLevel);
+                if (StringUtils.isNotEmpty(oldAliasStr)) {
+                  String[] oldAliasNodes = Arrays.copyOf(nodes, nodes.length);
+                  oldAliasNodes[nodes.length - 1] = oldAliasStr;
+                  String oldAliasLevel =
+                      RocksDBUtils.getLevelPath(oldAliasNodes, oldAliasNodes.length - 1);
+                  byte[] oldAliasKey = RocksDBUtils.toAliasNodeKey(oldAliasLevel);
+                  oldAliasLock = locksPool.computeIfAbsent(oldAliasLevel, x -> new ReentrantLock());
+                  if (oldAliasLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+                    lockedOldAlias = true;
+                    if (!readWriteHandler.keyExist(oldAliasKey)) {
+                      logger.error(
+                          "origin node [{}] has alias but alias node [{}] doesn't exist ",
+                          levelPath,
+                          oldAliasLevel);
+                    }
+                    batch.delete(oldAliasKey);
+                  } else {
+                    throw new AcquireLockTimeoutException("acquire lock timeout: " + oldAliasLevel);
                   }
-                  batch.delete(oldAliasKey);
-                } else {
-                  throw new AcquireLockTimeoutException("acquire lock timeout: " + oldAliasLevel);
                 }
+              } else {
+                throw new AcquireLockTimeoutException("acquire lock timeout: " + newAliasLevel);
               }
               // TODO: need application lock
               readWriteHandler.executeBatch(batch);
             } finally {
-              newAliasLock.unlock();
+              if (lockedNewAlias) {
+                newAliasLock.unlock();
+              }
               if (oldAliasLock != null && lockedOldAlias) {
                 oldAliasLock.unlock();
               }
@@ -1958,7 +2001,6 @@ public class MRocksDBManager implements IMetaManager {
           }
 
           WriteBatch batch = new WriteBatch();
-          boolean hasUpdate = false;
           if (tagsMap != null && !tagsMap.isEmpty()) {
             if (mNode.getTags() == null) {
               mNode.setTags(tagsMap);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
index 0e701ea..69d8e7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
@@ -60,6 +60,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
 
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
@@ -246,7 +247,7 @@ public class RocksDBReadWriteHandler {
                   Holder<byte[]> holder = new Holder<>();
                   boolean keyExisted = keyExist(key, holder);
                   if (keyExisted) {
-                    result.setSingleCheckValue(x.value, true);
+                    result.setExistType(x.value);
                     result.setValue(holder.getValue());
                   }
                 } catch (RocksDBException e) {
@@ -437,6 +438,14 @@ public class RocksDBReadWriteHandler {
     return result;
   }
 
+  public void traverseByPrefix(byte[] prefix, BiConsumer<byte[], byte[]> consumer) {
+    RocksIterator iterator = rocksDB.newIterator();
+    for (iterator.seek(prefix); iterator.isValid(); iterator.next()) {
+      if (RocksDBUtils.startWith(iterator.key(), prefix))
+        consumer.accept(iterator.key(), iterator.value());
+    }
+  }
+
   public String findBelongToSpecifiedNodeType(String[] nodes, char nodeType) {
     String innerPathName;
     for (int level = nodes.length; level > 0; level--) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index a1d37cb..4aed3a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -544,4 +544,17 @@ public class RocksDBUtils {
     }
     return stringBuilder.substring(1);
   }
+
+  public static boolean startWith(byte[] a, byte[] b) {
+    if (a.length < b.length) {
+      return false;
+    }
+
+    for (int i = 0; i < b.length; i++) {
+      if (a[i] != b[i]) {
+        return false;
+      }
+    }
+    return true;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java
index 4aca957..388fef9 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java
@@ -1,6 +1,7 @@
 package org.apache.iotdb.db.metadata.rocksdb;
 
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.utils.FileUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -114,10 +115,22 @@ public class MRocksDBUnitTest {
     mRocksDBManager.createTimeseries(
         path, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, null);
 
+    IMeasurementMNode m1 = mRocksDBManager.getMeasurementMNode(path);
+    Assert.assertNull(m1.getAlias());
+    Assert.assertEquals(m1.getSchema().getCompressor(), CompressionType.UNCOMPRESSED);
+    Assert.assertEquals(m1.getSchema().getEncodingType(), TSEncoding.PLAIN);
+    Assert.assertEquals(m1.getSchema().getType(), TSDataType.TEXT);
+    Assert.assertNull(m1.getSchema().getProps());
+
     PartialPath path2 = new PartialPath("root.tt.sg.dd.m2");
     mRocksDBManager.createTimeseries(
-        path2, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, "ma");
-    mRocksDBManager.printScanAllKeys();
+        path2, TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.GZIP, null, "ma");
+    IMeasurementMNode m2 = mRocksDBManager.getMeasurementMNode(path2);
+    Assert.assertEquals(m2.getAlias(), "ma");
+    Assert.assertEquals(m2.getSchema().getCompressor(), CompressionType.GZIP);
+    Assert.assertEquals(m2.getSchema().getEncodingType(), TSEncoding.PLAIN);
+    Assert.assertEquals(m2.getSchema().getType(), TSDataType.DOUBLE);
+    Assert.assertNull(m2.getSchema().getProps());
   }
 
   @Test
@@ -221,6 +234,67 @@ public class MRocksDBUnitTest {
     //    mRocksDBManager.traverseByPatternPath(new PartialPath("root.sg.d1.*"));
   }
 
+  @Test
+  public void testDeleteTimeseries() throws MetadataException, IOException {
+    List<PartialPath> timeseries = new ArrayList<>();
+    timeseries.add(new PartialPath("root.sg.d1.m1"));
+    timeseries.add(new PartialPath("root.sg.d1.m2"));
+    timeseries.add(new PartialPath("root.sg.d2.m1"));
+    timeseries.add(new PartialPath("root.sg.d2.m2"));
+    timeseries.add(new PartialPath("root.sg.d3.m1"));
+    timeseries.add(new PartialPath("root.sg.d3.m2"));
+    timeseries.add(new PartialPath("root.sg1.d1.m1"));
+    timeseries.add(new PartialPath("root.sg1.d1.m2"));
+    timeseries.add(new PartialPath("root.sg1.d2.m1"));
+    timeseries.add(new PartialPath("root.sg1.d2.m2"));
+
+    for (PartialPath path : timeseries) {
+      mRocksDBManager.createTimeseries(
+          path, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, null);
+    }
+
+    Assert.assertEquals(
+        mRocksDBManager.getAllTimeseriesCount(new PartialPath("root.**")), timeseries.size());
+
+    int count = timeseries.size();
+    mRocksDBManager.deleteTimeseries(new PartialPath("root.sg.d1.*"));
+    Assert.assertEquals(
+        mRocksDBManager.getAllTimeseriesCount(new PartialPath("root.**")), count - 2);
+
+    count = count - 2;
+    mRocksDBManager.deleteTimeseries(new PartialPath("root.sg1.**"));
+    Assert.assertEquals(
+        mRocksDBManager.getAllTimeseriesCount(new PartialPath("root.**")), count - 4);
+
+    count = count - 4;
+    mRocksDBManager.deleteTimeseries(new PartialPath("root.sg.*.m1"));
+    Assert.assertEquals(
+        mRocksDBManager.getAllTimeseriesCount(new PartialPath("root.**")), count - 2);
+
+    mRocksDBManager.printScanAllKeys();
+  }
+
+  @Test
+  public void testUpsert() throws MetadataException, IOException {
+    PartialPath path2 = new PartialPath("root.tt.sg.dd.m2");
+    mRocksDBManager.createTimeseries(
+        path2, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, "ma");
+
+    IMeasurementMNode m1 = mRocksDBManager.getMeasurementMNode(new PartialPath("root.tt.sg.dd.m2"));
+    Assert.assertEquals(m1.getAlias(), "ma");
+
+    mRocksDBManager.changeAlias(new PartialPath("root.tt.sg.dd.m2"), "test");
+
+    IMeasurementMNode m2 = mRocksDBManager.getMeasurementMNode(new PartialPath("root.tt.sg.dd.m2"));
+    Assert.assertEquals(m2.getAlias(), "test");
+
+    mRocksDBManager.printScanAllKeys();
+
+    IMeasurementMNode m3 =
+        mRocksDBManager.getMeasurementMNode(new PartialPath("root.tt.sg.dd.test"));
+    Assert.assertEquals(m3.getAlias(), "test");
+  }
+
   @After
   public void clean() {
     mRocksDBManager.close();

[iotdb] 44/45: fix bug of the incorrect number by counting nodes

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 83da91c220951bd4558264334f1608c17d315564
Author: lisijia <li...@360.cn>
AuthorDate: Wed Mar 16 17:58:30 2022 +0800

    fix bug of the incorrect number by counting nodes
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 57 ++++++++++++++--------
 .../metadata/rocksdb/RocksDBReadWriteHandler.java  | 18 +++++--
 2 files changed, 49 insertions(+), 26 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 95f23e4..459e924 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -118,6 +118,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
@@ -1174,14 +1175,22 @@ public class MRocksDBManager implements IMetaManager {
     }
     String innerNameByLevel =
         RocksDBUtils.getLevelPath(pathPattern.getNodes(), pathPattern.getNodeLength() - 1, level);
-    for (RocksDBMNodeType type : RocksDBMNodeType.values()) {
-      String getKeyByInnerNameLevel = type.value + innerNameByLevel;
-      int queryResult = readWriteHandler.getKeyByPrefix(getKeyByInnerNameLevel).size();
-      if (queryResult != 0) {
-        return queryResult;
-      }
-    }
-    return 0;
+    AtomicInteger atomicInteger = new AtomicInteger(0);
+    Function<String, Boolean> function =
+        s -> {
+          atomicInteger.incrementAndGet();
+          return true;
+        };
+    Arrays.stream(ALL_NODE_TYPE_ARRAY)
+        .parallel()
+        .forEach(
+            x -> {
+              String getKeyByInnerNameLevel =
+                  x + innerNameByLevel + RockDBConstants.PATH_SEPARATOR + level;
+              readWriteHandler.getKeyByPrefix(getKeyByInnerNameLevel, function);
+            });
+
+    return atomicInteger.get();
   }
 
   /**
@@ -1282,13 +1291,17 @@ public class MRocksDBManager implements IMetaManager {
                 pathPattern.getNodeLength())
             + RockDBConstants.PATH_SEPARATOR
             + pathPattern.getNodeLength();
+    Function<String, Boolean> function =
+        s -> {
+          result.add(RocksDBUtils.getPathByInnerName(s));
+          return true;
+        };
+
     Arrays.stream(ALL_NODE_TYPE_ARRAY)
         .parallel()
         .forEach(
             x -> {
-              for (String string : readWriteHandler.getKeyByPrefix(x + innerNameByLevel)) {
-                result.add(RocksDBUtils.getPathByInnerName(string));
-              }
+              readWriteHandler.getKeyByPrefix(x + innerNameByLevel, function);
             });
     return result;
   }
@@ -1420,16 +1433,18 @@ public class MRocksDBManager implements IMetaManager {
   /** Get all storage group paths */
   @Override
   public List<PartialPath> getAllStorageGroupPaths() {
-    List<PartialPath> allStorageGroupPath = new ArrayList<>();
-    Set<String> allStorageGroupInnerName =
-        readWriteHandler.getKeyByPrefix(String.valueOf(NODE_TYPE_SG));
-    for (String str : allStorageGroupInnerName) {
-      try {
-        allStorageGroupPath.add(new PartialPath(RocksDBUtils.getPathByInnerName(str)));
-      } catch (IllegalPathException e) {
-        throw new RuntimeException(e);
-      }
-    }
+    List<PartialPath> allStorageGroupPath = Collections.synchronizedList(new ArrayList<>());
+    Function<String, Boolean> function =
+        s -> {
+          try {
+            allStorageGroupPath.add(new PartialPath(RocksDBUtils.getPathByInnerName(s)));
+          } catch (IllegalPathException e) {
+            logger.error(e.getMessage());
+            return false;
+          }
+          return true;
+        };
+    readWriteHandler.getKeyByPrefix(String.valueOf(NODE_TYPE_SG), function);
     return allStorageGroupPath;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
index cac86e6..d4a3f05 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
@@ -63,7 +63,17 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ORIGIN_KEY;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_VERSION;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_FLAG;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ROOT;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.PATH_SEPARATOR;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ROOT;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
 
 public class RocksDBReadWriteHandler {
 
@@ -414,17 +424,15 @@ public class RocksDBReadWriteHandler {
     return rocksDB.newIterator(columnFamilyHandle);
   }
 
-  public Set<String> getKeyByPrefix(String innerName) {
+  public void getKeyByPrefix(String innerName, Function<String, Boolean> function) {
     RocksIterator iterator = rocksDB.newIterator();
-    Set<String> result = new HashSet<>();
     for (iterator.seek(innerName.getBytes()); iterator.isValid(); iterator.next()) {
       String keyStr = new String(iterator.key());
       if (!keyStr.startsWith(innerName)) {
         break;
       }
-      result.add(keyStr);
+      function.apply(keyStr);
     }
-    return result;
   }
 
   public Map<byte[], byte[]> getKeyValueByPrefix(String innerName) {

[iotdb] 07/45: [rocksdb] complete metadata transfer from mmanage to mrockdb

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e6e7bc0e18ef9d9487c47bdde4a451dd158ddebd
Author: chengjianyun <ch...@360.cn>
AuthorDate: Wed Mar 2 18:38:53 2022 +0800

    [rocksdb] complete metadata transfer from mmanage to mrockdb
---
 .../metadata/AcquireLockTimeoutException.java      |   7 +
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java |  51 ++--
 .../db/metadata/rocksdb/MetaDataTransfer.java      | 278 +++++++++++++++++----
 .../metadata/rocksdb/RocksDBReadWriteHandler.java  |   2 +-
 .../{MRocksDBTest.java => MRocksDBBenchmark.java}  |  30 +--
 ...TestEngine.java => RocksDBBenchmarkEngine.java} |  16 +-
 ...ksDBTestTask.java => RocksDBBenchmarkTask.java} |   4 +-
 .../db/metadata/rocksdb/RocksDBTestUtils.java      |   5 +-
 8 files changed, 284 insertions(+), 109 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/exception/metadata/AcquireLockTimeoutException.java b/server/src/main/java/org/apache/iotdb/db/exception/metadata/AcquireLockTimeoutException.java
new file mode 100644
index 0000000..81c39d5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/metadata/AcquireLockTimeoutException.java
@@ -0,0 +1,7 @@
+package org.apache.iotdb.db.exception.metadata;
+
+public class AcquireLockTimeoutException extends MetadataException {
+  public AcquireLockTimeoutException(String msg) {
+    super(msg);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index ec2bd73..e408263 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.metadata.rocksdb;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
 import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
@@ -118,17 +119,8 @@ import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_NODE_VALUE;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_SCHEMA;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
+import static org.apache.iotdb.db.metadata.rocksdb.RocksDBUtils.*;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
 /**
@@ -386,7 +378,6 @@ public class MRocksDBManager implements IMetaManager {
     Holder<byte[]> holder = new Holder<>();
     Lock lock = locksPool.get(levelPath);
     if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
-      Thread.sleep(5);
       lockedLocks.push(lock);
       try {
         CheckKeyResult checkResult = readWriteHandler.keyExistByAllTypes(levelPath, holder);
@@ -402,12 +393,12 @@ public class MRocksDBManager implements IMetaManager {
           }
         } else {
           if (start == nodes.length) {
-            throw new PathAlreadyExistException("Measurement node already exists");
+            throw new PathAlreadyExistException(levelPath);
           }
 
           if (checkResult.getResult(RocksDBMNodeType.MEASUREMENT)
               || checkResult.getResult(RocksDBMNodeType.ALISA)) {
-            throw new PathAlreadyExistException("Path contains measurement node");
+            throw new PathAlreadyExistException(levelPath);
           }
 
           if (start == nodes.length - 1) {
@@ -440,7 +431,7 @@ public class MRocksDBManager implements IMetaManager {
       while (!lockedLocks.isEmpty()) {
         lockedLocks.pop().unlock();
       }
-      throw new MetadataException("acquire lock timeout: " + levelPath);
+      throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
     }
   }
 
@@ -481,7 +472,7 @@ public class MRocksDBManager implements IMetaManager {
           lock.unlock();
         }
       } else {
-        throw new MetadataException("acquire lock timeout: " + levelPath);
+        throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
       }
     } else {
       readWriteHandler.executeBatch(batch);
@@ -559,7 +550,7 @@ public class MRocksDBManager implements IMetaManager {
               throw new PathAlreadyExistException(lockKey);
             }
           } else {
-            throw new MetadataException("acquire lock timeout: " + lockKey);
+            throw new AcquireLockTimeoutException("acquire lock timeout: " + lockKey);
           }
         }
         readWriteHandler.executeBatch(batch);
@@ -633,7 +624,7 @@ public class MRocksDBManager implements IMetaManager {
       while (!lockedLocks.isEmpty()) {
         lockedLocks.pop().unlock();
       }
-      throw new MetadataException("acquire lock timeout: " + levelPath);
+      throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
     }
   }
 
@@ -679,7 +670,7 @@ public class MRocksDBManager implements IMetaManager {
             lock.unlock();
           }
         } else {
-          throw new MetadataException("acquire lock timeout, " + p.getFullPath());
+          throw new AcquireLockTimeoutException("acquire lock timeout, " + p.getFullPath());
         }
 
         // delete parent node if is empty
@@ -713,7 +704,7 @@ public class MRocksDBManager implements IMetaManager {
               curLock.unlock();
             }
           } else {
-            throw new MetadataException("acquire lock timeout, " + curNode.getFullPath());
+            throw new AcquireLockTimeoutException("acquire lock timeout, " + curNode.getFullPath());
           }
         }
         // TODO: trigger engine update
@@ -819,7 +810,7 @@ public class MRocksDBManager implements IMetaManager {
             lock.unlock();
           }
         } else {
-          throw new MetadataException("acquire lock timeout: " + levelKey);
+          throw new AcquireLockTimeoutException("acquire lock timeout: " + levelKey);
         }
       }
     } catch (RocksDBException | InterruptedException e) {
@@ -901,7 +892,7 @@ public class MRocksDBManager implements IMetaManager {
           lock.unlock();
         }
       } else {
-        throw new MetadataException("acquire lock timeout: " + levelPath);
+        throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
       }
     } catch (InterruptedException | RocksDBException e) {
       throw new MetadataException(e);
@@ -1817,7 +1808,7 @@ public class MRocksDBManager implements IMetaManager {
                 }
                 batch.put(newAliasKey, RocksDBUtils.buildAliasNodeValue(originKey));
               } else {
-                throw new MetadataException("acquire lock timeout: " + newAliasLevel);
+                throw new AcquireLockTimeoutException("acquire lock timeout: " + newAliasLevel);
               }
 
               if (StringUtils.isNotEmpty(mNode.getAlias()) && !mNode.getAlias().equals(alias)) {
@@ -1836,7 +1827,7 @@ public class MRocksDBManager implements IMetaManager {
                   }
                   batch.delete(oldAliasKey);
                 } else {
-                  throw new MetadataException("acquire lock timeout: " + oldAliasLevel);
+                  throw new AcquireLockTimeoutException("acquire lock timeout: " + oldAliasLevel);
                 }
               }
               // TODO: need application lock
@@ -1877,7 +1868,7 @@ public class MRocksDBManager implements IMetaManager {
           rawKeyLock.unlock();
         }
       } else {
-        throw new MetadataException("acquire lock timeout: " + levelPath);
+        throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
       }
     } catch (RocksDBException | InterruptedException e) {
       throw new MetadataException(e);
@@ -1918,7 +1909,7 @@ public class MRocksDBManager implements IMetaManager {
           lock.unlock();
         }
       } else {
-        throw new MetadataException("acquire lock timeout: " + levelPath);
+        throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
       }
     } catch (RocksDBException | InterruptedException e) {
       throw new MetadataException(e);
@@ -1967,7 +1958,7 @@ public class MRocksDBManager implements IMetaManager {
           lock.unlock();
         }
       } else {
-        throw new MetadataException("acquire lock timeout: " + levelPath);
+        throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
       }
     } catch (RocksDBException | InterruptedException e) {
       throw new MetadataException(e);
@@ -2022,7 +2013,7 @@ public class MRocksDBManager implements IMetaManager {
           lock.unlock();
         }
       } else {
-        throw new MetadataException("acquire lock timeout: " + levelPath);
+        throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
       }
     } catch (RocksDBException | InterruptedException e) {
       throw new MetadataException(e);
@@ -2070,7 +2061,7 @@ public class MRocksDBManager implements IMetaManager {
           lock.unlock();
         }
       } else {
-        throw new MetadataException("acquire lock timeout: " + levelPath);
+        throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
       }
       if (!readWriteHandler.keyExist(key, holder)) {
         throw new PathNotExistException(path.getFullPath());
@@ -2125,7 +2116,7 @@ public class MRocksDBManager implements IMetaManager {
           lock.unlock();
         }
       } else {
-        throw new MetadataException("acquire lock timeout: " + levelPath);
+        throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
       }
     } catch (RocksDBException | InterruptedException e) {
       throw new MetadataException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
index b99ef5b..84e02dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
@@ -21,15 +21,28 @@ package org.apache.iotdb.db.metadata.rocksdb;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
+import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.metadata.logfile.MLogReader;
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
-import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
-import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.mtree.MTree;
+import org.apache.iotdb.db.metadata.mtree.traverser.collector.MeasurementCollector;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
-import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +52,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 public class MetaDataTransfer {
 
@@ -47,31 +59,38 @@ public class MetaDataTransfer {
 
   private String mtreeSnapshotPath;
   private MRocksDBManager rocksDBManager;
+  private MLogWriter mLogWriter;
+  private String failedMLogPath =
+      IoTDBDescriptor.getInstance().getConfig().getSchemaDir()
+          + File.separator
+          + MetadataConstant.METADATA_LOG
+          + ".transfer_failed";
 
-  private AtomicInteger storageGroupToCreateCount = new AtomicInteger();
-  private AtomicLong timeSeriesToCreateCount = new AtomicLong();
-
-  private AtomicInteger createStorageGroupCount = new AtomicInteger();
-  private AtomicLong createTimeSeriesCount = new AtomicLong();
+  private AtomicInteger failedPlanCount = new AtomicInteger(0);
+  private List<PhysicalPlan> retryPlans = new ArrayList<>();
 
   MetaDataTransfer() throws MetadataException {
     rocksDBManager = new MRocksDBManager();
   }
 
-  public void init() throws IOException {
-    mtreeSnapshotPath =
-        IoTDBDescriptor.getInstance().getConfig().getSchemaDir()
-            + File.separator
-            + MetadataConstant.MTREE_SNAPSHOT;
+  public static void main(String[] args) {
+    try {
+      MetaDataTransfer transfer = new MetaDataTransfer();
+      transfer.doTransfer();
+    } catch (MetadataException | IOException e) {
+      e.printStackTrace();
+    }
+  }
 
-    File mtreeSnapshot = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
-    long time = System.currentTimeMillis();
-    if (mtreeSnapshot.exists()) {
-      transferFromSnapshot(mtreeSnapshot);
-      logger.debug(
-          "spend {} ms to deserialize mtree from snapshot", System.currentTimeMillis() - time);
+  public void doTransfer() throws IOException {
+    File failedFile = new File(failedMLogPath);
+    if (failedFile.exists()) {
+      failedFile.delete();
     }
 
+    mLogWriter = new MLogWriter(failedMLogPath);
+    mLogWriter.setLogNum(0);
+
     String schemaDir = IoTDBDescriptor.getInstance().getConfig().getSchemaDir();
     File schemaFolder = SystemFileFactory.INSTANCE.getFile(schemaDir);
     if (!schemaFolder.exists()) {
@@ -82,6 +101,14 @@ public class MetaDataTransfer {
       }
     }
 
+    mtreeSnapshotPath = schemaDir + File.separator + MetadataConstant.MTREE_SNAPSHOT;
+    File mtreeSnapshot = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
+    long time = System.currentTimeMillis();
+    if (mtreeSnapshot.exists()) {
+      transferFromSnapshot(mtreeSnapshot);
+      logger.info("spend {} ms to transfer data from snapshot", System.currentTimeMillis() - time);
+    }
+
     time = System.currentTimeMillis();
     String logFilePath = schemaDir + File.separator + MetadataConstant.METADATA_LOG;
     File logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
@@ -98,12 +125,17 @@ public class MetaDataTransfer {
       logger.info("no mlog.bin file find, skip transfer");
     }
 
-    logger.info("Do transfer success");
+    mLogWriter.close();
+
+    logger.info(
+        "do transfer complete with {} plan failed. Failed plan are persisted in mlog.bin.transfer_failed",
+        failedPlanCount.get());
   }
 
   private void transferFromMLog(MLogReader mLogReader) {
     int idx = 0;
     PhysicalPlan plan;
+    List<PhysicalPlan> nonCollisionCollections = new ArrayList<>();
     while (mLogReader.hasNext()) {
       try {
         plan = mLogReader.next();
@@ -116,45 +148,154 @@ public class MetaDataTransfer {
         continue;
       }
       try {
-        rocksDBManager.operation(plan);
+        switch (plan.getOperatorType()) {
+          case CREATE_TIMESERIES:
+          case CREATE_ALIGNED_TIMESERIES:
+          case AUTO_CREATE_DEVICE_MNODE:
+            nonCollisionCollections.add(plan);
+            if (nonCollisionCollections.size() > 100000) {
+              executeOperation(nonCollisionCollections, true);
+            }
+            break;
+          case DELETE_TIMESERIES:
+          case SET_STORAGE_GROUP:
+          case DELETE_STORAGE_GROUP:
+          case TTL:
+          case CHANGE_ALIAS:
+            executeOperation(nonCollisionCollections, true);
+            rocksDBManager.operation(plan);
+            break;
+          case CHANGE_TAG_OFFSET:
+          case CREATE_TEMPLATE:
+          case DROP_TEMPLATE:
+          case APPEND_TEMPLATE:
+          case PRUNE_TEMPLATE:
+          case SET_TEMPLATE:
+          case ACTIVATE_TEMPLATE:
+          case UNSET_TEMPLATE:
+          case CREATE_CONTINUOUS_QUERY:
+          case DROP_CONTINUOUS_QUERY:
+            logger.error("unsupported operations {}", plan.toString());
+            break;
+          default:
+            logger.error("Unrecognizable command {}", plan.getOperatorType());
+        }
       } catch (MetadataException | IOException e) {
         logger.error("Can not operate cmd {} for err:", plan.getOperatorType(), e);
+        if (!(e instanceof StorageGroupAlreadySetException)
+            && !(e instanceof PathAlreadyExistException)
+            && !(e instanceof AliasAlreadyExistException)) {
+          persistFailedLog(plan);
+        }
+      }
+    }
+    executeOperation(nonCollisionCollections, true);
+    if (retryPlans.size() > 0) {
+      executeOperation(retryPlans, false);
+    }
+  }
+
+  private void executeOperation(List<PhysicalPlan> plans, boolean needsToRetry) {
+    plans
+        .parallelStream()
+        .forEach(
+            x -> {
+              try {
+                rocksDBManager.operation(x);
+              } catch (IOException e) {
+                logger.error("failed to operate plan: {}", x.toString(), e);
+                retryPlans.add(x);
+              } catch (MetadataException e) {
+                logger.error("failed to operate plan: {}", x.toString(), e);
+                if (e instanceof AcquireLockTimeoutException && needsToRetry) {
+                  retryPlans.add(x);
+                } else {
+                  persistFailedLog(x);
+                }
+              } catch (Exception e) {
+                if (needsToRetry) {
+                  retryPlans.add(x);
+                } else {
+                  persistFailedLog(x);
+                }
+              }
+            });
+    logger.info("parallel executed {} operations", plans.size());
+    plans.clear();
+  }
+
+  private void persistFailedLog(PhysicalPlan plan) {
+    logger.info("persist won't retry and failed plan: {}", plan.toString());
+    failedPlanCount.incrementAndGet();
+    try {
+      switch (plan.getOperatorType()) {
+        case CREATE_TIMESERIES:
+          mLogWriter.createTimeseries((CreateTimeSeriesPlan) plan);
+          break;
+        case CREATE_ALIGNED_TIMESERIES:
+          mLogWriter.createAlignedTimeseries((CreateAlignedTimeSeriesPlan) plan);
+          break;
+        case AUTO_CREATE_DEVICE_MNODE:
+          mLogWriter.autoCreateDeviceMNode((AutoCreateDeviceMNodePlan) plan);
+          break;
+        case DELETE_TIMESERIES:
+          mLogWriter.deleteTimeseries((DeleteTimeSeriesPlan) plan);
+          break;
+        case SET_STORAGE_GROUP:
+          SetStorageGroupPlan setStorageGroupPlan = (SetStorageGroupPlan) plan;
+          mLogWriter.setStorageGroup(setStorageGroupPlan.getPath());
+          break;
+        case DELETE_STORAGE_GROUP:
+          DeleteStorageGroupPlan deletePlan = (DeleteStorageGroupPlan) plan;
+          for (PartialPath path : deletePlan.getPaths()) {
+            mLogWriter.deleteStorageGroup(path);
+          }
+          break;
+        case TTL:
+          SetTTLPlan ttlPlan = (SetTTLPlan) plan;
+          mLogWriter.setTTL(ttlPlan.getStorageGroup(), ttlPlan.getDataTTL());
+          break;
+        case CHANGE_ALIAS:
+          ChangeAliasPlan changeAliasPlan = (ChangeAliasPlan) plan;
+          mLogWriter.changeAlias(changeAliasPlan.getPath(), changeAliasPlan.getAlias());
+          break;
+        case CHANGE_TAG_OFFSET:
+        case CREATE_TEMPLATE:
+        case DROP_TEMPLATE:
+        case APPEND_TEMPLATE:
+        case PRUNE_TEMPLATE:
+        case SET_TEMPLATE:
+        case ACTIVATE_TEMPLATE:
+        case UNSET_TEMPLATE:
+        case CREATE_CONTINUOUS_QUERY:
+        case DROP_CONTINUOUS_QUERY:
+          throw new UnsupportedOperationException(plan.getOperatorType().toString());
+        default:
+          logger.error("Unrecognizable command {}", plan.getOperatorType());
       }
+    } catch (IOException e) {
+      logger.error(
+          "fatal error, exception when persist failed plan, metadata transfer should be failed", e);
     }
   }
 
   public void transferFromSnapshot(File mtreeSnapshot) {
     try (MLogReader mLogReader = new MLogReader(mtreeSnapshot)) {
       doTransferFromSnapshot(mLogReader);
-    } catch (IOException e) {
+    } catch (IOException | MetadataException e) {
       logger.warn("Failed to deserialize from {}. Use a new MTree.", mtreeSnapshot.getPath());
     }
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  private void doTransferFromSnapshot(MLogReader mLogReader) {
+  private void doTransferFromSnapshot(MLogReader mLogReader) throws IOException, MetadataException {
     long start = System.currentTimeMillis();
-    List<IMeasurementMNode> measurementList = new ArrayList<>();
-    List<StorageGroupMNode> sgList = new ArrayList<>();
-    while (mLogReader.hasNext()) {
-      PhysicalPlan plan = null;
-      try {
-        plan = mLogReader.next();
-        if (plan == null) {
-          continue;
-        }
-        if (plan instanceof StorageGroupMNodePlan) {
-          sgList.add(StorageGroupMNode.deserializeFrom((StorageGroupMNodePlan) plan));
-        } else if (plan instanceof MeasurementMNodePlan) {
-          measurementList.add(MeasurementMNode.deserializeFrom((MeasurementMNodePlan) plan));
-        }
-      } catch (Exception e) {
-        logger.error(
-            "Can not operate cmd {} for err:", plan == null ? "" : plan.getOperatorType(), e);
-      }
-    }
+    MTree mTree = new MTree();
+    mTree.init();
+    List<IStorageGroupMNode> storageGroupNodes = mTree.getAllStorageGroupNodes();
 
-    sgList
+    AtomicInteger errorCount = new AtomicInteger(0);
+    storageGroupNodes
         .parallelStream()
         .forEach(
             sgNode -> {
@@ -164,22 +305,61 @@ public class MetaDataTransfer {
                   rocksDBManager.setTTL(sgNode.getPartialPath(), sgNode.getDataTTL());
                 }
               } catch (MetadataException | IOException e) {
-                logger.error("");
+                if (!(e instanceof StorageGroupAlreadySetException)
+                    && !(e instanceof PathAlreadyExistException)
+                    && !(e instanceof AliasAlreadyExistException)) {
+                  errorCount.incrementAndGet();
+                }
+                logger.error(
+                    "create storage group {} failed", sgNode.getPartialPath().getFullPath(), e);
               }
             });
 
-    measurementList
+    if (errorCount.get() > 0) {
+      logger.info("Fatal error. create some storage groups fail, terminate metadata transfer");
+      return;
+    }
+
+    List<IMeasurementMNode> measurementMNodes = new ArrayList<>();
+
+    MeasurementCollector collector =
+        new MeasurementCollector(
+            mTree.getNodeByPath(new PartialPath("root")), new PartialPath("root.**"), -1, -1) {
+          @Override
+          protected void collectMeasurement(IMeasurementMNode node) throws MetadataException {
+            measurementMNodes.add(node);
+          }
+        };
+    collector.traverse();
+
+    measurementMNodes
         .parallelStream()
         .forEach(
             mNode -> {
               try {
                 rocksDBManager.createTimeSeries(
                     mNode.getPartialPath(), mNode.getSchema(), mNode.getAlias(), null, null);
+              } catch (AcquireLockTimeoutException e) {
+                try {
+                  rocksDBManager.createTimeSeries(
+                      mNode.getPartialPath(), mNode.getSchema(), mNode.getAlias(), null, null);
+                } catch (MetadataException metadataException) {
+                  logger.error(
+                      "create timeseries {} failed in retry",
+                      mNode.getPartialPath().getFullPath(),
+                      e);
+                  errorCount.incrementAndGet();
+                }
               } catch (MetadataException e) {
-                logger.error("");
+                logger.error(
+                    "create timeseries {} failed", mNode.getPartialPath().getFullPath(), e);
+                errorCount.incrementAndGet();
               }
             });
 
-    logger.info("snapshot transfer complete after {}ms", System.currentTimeMillis() - start);
+    logger.info(
+        "metadata snapshot transfer complete after {}ms with {} errors",
+        System.currentTimeMillis() - start,
+        errorCount.get());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
index 64359f9..9ab5f42 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
@@ -92,7 +92,7 @@ public class RocksDBReadWriteHandler {
     Options options = new Options();
     options.setCreateIfMissing(true);
     options.setAllowMmapReads(true);
-    options.setRowCache(new LRUCache(900000));
+    options.setRowCache(new LRUCache(9000000));
     options.setDbWriteBufferSize(16 * 1024 * 1024);
 
     org.rocksdb.Logger rocksDBLogger = new RockDBLogger(options, logger);
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBBenchmark.java
similarity index 86%
rename from server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBTest.java
rename to server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBBenchmark.java
index b255f31..8f054a5 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBBenchmark.java
@@ -13,20 +13,20 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
-public class MRocksDBTest {
+public class MRocksDBBenchmark {
   protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private MRocksDBManager rocksDBManager;
 
-  public MRocksDBTest(MRocksDBManager rocksDBManager) {
+  public MRocksDBBenchmark(MRocksDBManager rocksDBManager) {
     this.rocksDBManager = rocksDBManager;
   }
 
-  public List<RocksDBTestTask.BenchmarkResult> benchmarkResults = new ArrayList<>();
+  public List<RocksDBBenchmarkTask.BenchmarkResult> benchmarkResults = new ArrayList<>();
 
   public void testStorageGroupCreation(List<SetStorageGroupPlan> storageGroups) {
-    RocksDBTestTask<SetStorageGroupPlan> task =
-        new RocksDBTestTask<>(storageGroups, RocksDBTestUtils.WRITE_CLIENT_NUM, 100);
-    RocksDBTestTask.BenchmarkResult result =
+    RocksDBBenchmarkTask<SetStorageGroupPlan> task =
+        new RocksDBBenchmarkTask<>(storageGroups, RocksDBTestUtils.WRITE_CLIENT_NUM, 100);
+    RocksDBBenchmarkTask.BenchmarkResult result =
         task.runWork(
             setStorageGroupPlan -> {
               try {
@@ -42,12 +42,12 @@ public class MRocksDBTest {
 
   public void testTimeSeriesCreation(List<List<CreateTimeSeriesPlan>> timeSeriesSet)
       throws IOException {
-    RocksDBTestTask<List<CreateTimeSeriesPlan>> task =
-        new RocksDBTestTask<>(timeSeriesSet, RocksDBTestUtils.WRITE_CLIENT_NUM, 100);
-    RocksDBTestTask.BenchmarkResult result =
+    RocksDBBenchmarkTask<List<CreateTimeSeriesPlan>> task =
+        new RocksDBBenchmarkTask<>(timeSeriesSet, RocksDBTestUtils.WRITE_CLIENT_NUM, 100);
+    RocksDBBenchmarkTask.BenchmarkResult result =
         task.runBatchWork(
             createTimeSeriesPlans -> {
-              RocksDBTestTask.TaskResult taskResult = new RocksDBTestTask.TaskResult();
+              RocksDBBenchmarkTask.TaskResult taskResult = new RocksDBBenchmarkTask.TaskResult();
               createTimeSeriesPlans.stream()
                   .forEach(
                       ts -> {
@@ -92,9 +92,9 @@ public class MRocksDBTest {
   //  }
 
   public void testNodeChildrenQuery(Collection<String> queryTsSet) {
-    RocksDBTestTask<String> task =
-        new RocksDBTestTask<>(queryTsSet, RocksDBTestUtils.WRITE_CLIENT_NUM, 10000);
-    RocksDBTestTask.BenchmarkResult result =
+    RocksDBBenchmarkTask<String> task =
+        new RocksDBBenchmarkTask<>(queryTsSet, RocksDBTestUtils.WRITE_CLIENT_NUM, 10000);
+    RocksDBBenchmarkTask.BenchmarkResult result =
         task.runWork(
             s -> {
               try {
@@ -121,8 +121,8 @@ public class MRocksDBTest {
     List<PartialPath> level4 = rocksDBManager.getNodesListInGivenLevel(null, 4);
     List<PartialPath> level5 = rocksDBManager.getNodesListInGivenLevel(null, 5);
     long totalCount = level1.size() + level2.size() + level3.size() + level4.size() + level5.size();
-    RocksDBTestTask.BenchmarkResult result =
-        new RocksDBTestTask.BenchmarkResult(
+    RocksDBBenchmarkTask.BenchmarkResult result =
+        new RocksDBBenchmarkTask.BenchmarkResult(
             "levelScan", totalCount, 0, System.currentTimeMillis() - start);
     benchmarkResults.add(result);
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBTestEngine.java b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBBenchmarkEngine.java
similarity index 89%
rename from server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBTestEngine.java
rename to server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBBenchmarkEngine.java
index ba124c1..f11ed07 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBTestEngine.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBBenchmarkEngine.java
@@ -8,7 +8,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.metadata.logfile.MLogReader;
-import org.apache.iotdb.db.metadata.logfile.MLogWriter;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -30,20 +29,19 @@ import java.util.Set;
 
 import static org.apache.iotdb.db.metadata.rocksdb.RocksDBReadWriteHandler.ROCKSDB_PATH;
 
-public class RocksDBTestEngine {
+public class RocksDBBenchmarkEngine {
   private static final Logger logger = LoggerFactory.getLogger(MManager.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private static final int BIN_CAPACITY = 100 * 1000;
 
-  private MLogWriter logWriter;
   private File logFile;
   public static List<List<CreateTimeSeriesPlan>> timeSeriesSet = new ArrayList<>();
   public static Set<String> measurementPathSet = new HashSet<>();
   public static Set<String> innerPathSet = new HashSet<>();
   public static List<SetStorageGroupPlan> storageGroups = new ArrayList<>();
 
-  public RocksDBTestEngine() {
+  public RocksDBBenchmarkEngine() {
     String schemaDir = config.getSchemaDir();
     String logFilePath = schemaDir + File.separator + MetadataConstant.METADATA_LOG;
     logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
@@ -58,10 +56,10 @@ public class RocksDBTestEngine {
           storageGroups, timeSeriesSet, measurementPathSet, innerPathSet);
       /** rocksdb benchmark * */
       MRocksDBManager rocksDBManager = new MRocksDBManager();
-      MRocksDBTest mRocksDBTest = new MRocksDBTest(rocksDBManager);
-      mRocksDBTest.testStorageGroupCreation(storageGroups);
-      mRocksDBTest.testTimeSeriesCreation(timeSeriesSet);
-      RocksDBTestUtils.printReport(mRocksDBTest.benchmarkResults, "rocksDB");
+      MRocksDBBenchmark mRocksDBBenchmark = new MRocksDBBenchmark(rocksDBManager);
+      mRocksDBBenchmark.testStorageGroupCreation(storageGroups);
+      mRocksDBBenchmark.testTimeSeriesCreation(timeSeriesSet);
+      RocksDBTestUtils.printReport(mRocksDBBenchmark.benchmarkResults, "rocksDB");
       RocksDBTestUtils.printMemInfo("Benchmark finished");
     } catch (IOException | MetadataException e) {
       logger.error("Error happened when run benchmark", e);
@@ -70,8 +68,6 @@ public class RocksDBTestEngine {
 
   public void prepareBenchmark() throws IOException {
     long time = System.currentTimeMillis();
-    logWriter = new MLogWriter(config.getSchemaDir(), MetadataConstant.METADATA_LOG + ".temp");
-    logWriter.setLogNum(0);
     if (!logFile.exists()) {
       throw new FileNotFoundException("we need a mlog.bin to init the benchmark test");
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBTestTask.java b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBBenchmarkTask.java
similarity index 95%
rename from server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBTestTask.java
rename to server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBBenchmarkTask.java
index 8c1cfbf..a628783 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBTestTask.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBBenchmarkTask.java
@@ -9,12 +9,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
-public class RocksDBTestTask<T> {
+public class RocksDBBenchmarkTask<T> {
   private Collection<T> dataSet;
   private int workCount;
   private int timeoutInMin;
 
-  RocksDBTestTask(Collection<T> dataSet, int workCount, int timeoutInMin) {
+  RocksDBBenchmarkTask(Collection<T> dataSet, int workCount, int timeoutInMin) {
     this.dataSet = dataSet;
     this.workCount = workCount;
     this.timeoutInMin = timeoutInMin;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBTestUtils.java b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBTestUtils.java
index 65609c4..4c75ae7 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBTestUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBTestUtils.java
@@ -37,13 +37,14 @@ public class RocksDBTestUtils {
         stageInfo, Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory());
   }
 
-  public static void printReport(List<RocksDBTestTask.BenchmarkResult> results, String category) {
+  public static void printReport(
+      List<RocksDBBenchmarkTask.BenchmarkResult> results, String category) {
     System.out.println(
         String.format(
             "\n\n#################################%s benchmark statistics#################################",
             category));
     System.out.println(String.format("%25s %15s %10s %15s", "", "success", "fail", "cost-in-ms"));
-    for (RocksDBTestTask.BenchmarkResult result : results) {
+    for (RocksDBBenchmarkTask.BenchmarkResult result : results) {
       System.out.println(
           String.format(
               "%25s %15d %10d %15d",

[iotdb] 04/45: traverser changed to concurrency

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a63cc7df3782a5931e98f66c3417ec94d3682ad7
Author: lisijia <li...@360.cn>
AuthorDate: Fri Mar 4 10:40:46 2022 +0800

    traverser changed to concurrency
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 43 +++-------------------
 .../iotdb/db/metadata/rocksdb/RocksDBUtils.java    | 35 ++++++++++++++++++
 2 files changed, 41 insertions(+), 37 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index fc7a066..603c07f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -129,9 +129,6 @@ import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEA
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
-import static org.apache.iotdb.db.metadata.rocksdb.RocksDBUtils.getAllCompoundMode;
-import static org.apache.iotdb.db.metadata.rocksdb.RocksDBUtils.newStringArray;
-import static org.apache.iotdb.db.metadata.rocksdb.RocksDBUtils.replaceWildcard;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
 /**
@@ -960,38 +957,10 @@ public class MRocksDBManager implements IMetaManager {
     return getCountByNodeType(new Character[] {NODE_TYPE_MEASUREMENT}, pathPattern.getNodes());
   }
 
-  // eg. root.a.*.**.b.**.c
-  public void replaceMultiWildcard(
-      String[] nodes, int maxLevel, Consumer<String> consumer, Character[] nodeTypeArray)
-      throws IllegalPathException {
-    List<Integer> multiWildcardPosition = new ArrayList<>();
-    for (int i = 0; i < nodes.length; i++) {
-      if (MULTI_LEVEL_PATH_WILDCARD.equals(nodes[i])) {
-        multiWildcardPosition.add(i);
-      }
-    }
-    if (multiWildcardPosition.isEmpty()) {
-      traverseByPatternPath(nodes, consumer, nodeTypeArray);
-    } else if (multiWildcardPosition.size() == 1) {
-      for (int i = 1; i <= maxLevel - nodes.length + 2; i++) {
-        String[] clone = nodes.clone();
-        clone[multiWildcardPosition.get(0)] = replaceWildcard(i);
-        traverseByPatternPath(newStringArray(clone), consumer, nodeTypeArray);
-      }
-    } else {
-      for (int sum = multiWildcardPosition.size();
-          sum <= maxLevel - (nodes.length - multiWildcardPosition.size() - 1);
-          sum++) {
-        List<int[]> result = getAllCompoundMode(sum, multiWildcardPosition.size());
-        for (int[] value : result) {
-          String[] clone = nodes.clone();
-          for (int i = 0; i < value.length; i++) {
-            clone[multiWildcardPosition.get(i)] = replaceWildcard(value[i]);
-          }
-          traverseByPatternPath(newStringArray(clone), consumer, nodeTypeArray);
-        }
-      }
-    }
+  public void traverseOutcomeBasins(
+      String[] nodes, int maxLevel, Consumer<String> consumer, Character[] nodeTypeArray) {
+    List<String[]> allNodesArray = RocksDBUtils.replaceMultiWildcardToSingle(nodes, maxLevel);
+    allNodesArray.parallelStream().forEach(x -> traverseByPatternPath(x, consumer, nodeTypeArray));
   }
 
   public void traverseByPatternPath(
@@ -1184,7 +1153,7 @@ public class MRocksDBManager implements IMetaManager {
     AtomicInteger atomicInteger = new AtomicInteger(0);
     Consumer<String> consumer = s -> atomicInteger.incrementAndGet();
 
-    replaceMultiWildcard(nodes, MAX_PATH_DEPTH, consumer, nodetype);
+    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, consumer, nodetype);
     return atomicInteger.get();
   }
 
@@ -1440,7 +1409,7 @@ public class MRocksDBManager implements IMetaManager {
     List<String> allResult = Collections.synchronizedList(new ArrayList<>());
     Consumer<String> consumer = allResult::add;
 
-    replaceMultiWildcard(nodes, MAX_PATH_DEPTH, consumer, nodetype);
+    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, consumer, nodetype);
 
     for (String path : allResult) {
       collection.add(new PartialPath(path));
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index 7cf4b76..1e9e365 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_ROOT;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ALIAS;
@@ -507,4 +508,38 @@ public class RocksDBUtils {
     }
     return allResult;
   }
+
+  // eg. root.a.*.**.b.**.c
+  public static List<String[]> replaceMultiWildcardToSingle(String[] nodes, int maxLevel) {
+    List<String[]> allNodesArray = new ArrayList<>();
+    List<Integer> multiWildcardPosition = new ArrayList<>();
+    for (int i = 0; i < nodes.length; i++) {
+      if (MULTI_LEVEL_PATH_WILDCARD.equals(nodes[i])) {
+        multiWildcardPosition.add(i);
+      }
+    }
+    if (multiWildcardPosition.isEmpty()) {
+      allNodesArray.add(nodes);
+    } else if (multiWildcardPosition.size() == 1) {
+      for (int i = 1; i <= maxLevel - nodes.length + 2; i++) {
+        String[] clone = nodes.clone();
+        clone[multiWildcardPosition.get(0)] = replaceWildcard(i);
+        allNodesArray.add(clone);
+      }
+    } else {
+      for (int sum = multiWildcardPosition.size();
+          sum <= maxLevel - (nodes.length - multiWildcardPosition.size() - 1);
+          sum++) {
+        List<int[]> result = getAllCompoundMode(sum, multiWildcardPosition.size());
+        for (int[] value : result) {
+          String[] clone = nodes.clone();
+          for (int i = 0; i < value.length; i++) {
+            clone[multiWildcardPosition.get(i)] = replaceWildcard(value[i]);
+          }
+          allNodesArray.add(clone);
+        }
+      }
+    }
+    return allNodesArray;
+  }
 }

[iotdb] 15/45: fix timeout issue when obtaining lock

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9640cf17566e9301b6f6d7a18bb09006ad2f9564
Author: lisijia <li...@360.cn>
AuthorDate: Wed Mar 9 15:25:30 2022 +0800

    fix timeout issue when obtaining lock
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 39 +++++++++++-----------
 1 file changed, 20 insertions(+), 19 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index a1f5aa9..7b0b5f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -85,7 +85,7 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
 
-import com.google.common.util.concurrent.Striped;
+import com.google.common.collect.MapMaker;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.rocksdb.Holder;
@@ -114,6 +114,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -182,8 +183,8 @@ public class MRocksDBManager implements IMetaManager {
 
   private RocksDBReadWriteHandler readWriteHandler;
 
-  // TODO: check how Stripped Lock consume memory
-  private Striped<Lock> locksPool = Striped.lazyWeakLock(10000);
+  private final Map<String, ReentrantLock> locksPool =
+      new MapMaker().weakValues().initialCapacity(10000).makeMap();
 
   private volatile Map<String, Boolean> storageGroupDeletingFlagMap = new ConcurrentHashMap<>();
 
@@ -385,7 +386,7 @@ public class MRocksDBManager implements IMetaManager {
       return;
     }
     String levelPath = RocksDBUtils.getLevelPath(nodes, start - 1);
-    Lock lock = locksPool.get(levelPath);
+    Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
     if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
       lockedLocks.push(lock);
       try {
@@ -468,7 +469,7 @@ public class MRocksDBManager implements IMetaManager {
       aliasNodes[nodes.length - 1] = alias;
       String aliasLevelPath = RocksDBUtils.getLevelPath(aliasNodes, aliasNodes.length - 1);
       byte[] aliasNodeKey = RocksDBUtils.toAliasNodeKey(aliasLevelPath);
-      Lock lock = locksPool.get(aliasLevelPath);
+      Lock lock = locksPool.computeIfAbsent(aliasLevelPath, x -> new ReentrantLock());
       if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
         try {
           if (!readWriteHandler.keyExistByAllTypes(aliasLevelPath).existAnyKey()) {
@@ -552,7 +553,7 @@ public class MRocksDBManager implements IMetaManager {
       Stack<Lock> acquiredLock = new Stack<>();
       try {
         for (String lockKey : locks) {
-          Lock lock = locksPool.get(lockKey);
+          Lock lock = locksPool.computeIfAbsent(lockKey, x -> new ReentrantLock());
           if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
             acquiredLock.push(lock);
             if (readWriteHandler.keyExistByAllTypes(lockKey).existAnyKey()) {
@@ -591,7 +592,7 @@ public class MRocksDBManager implements IMetaManager {
       return;
     }
     String levelPath = RocksDBUtils.getLevelPath(nodes, start - 1);
-    Lock lock = locksPool.get(levelPath);
+    Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
     if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
       try {
         CheckKeyResult checkResult = readWriteHandler.keyExistByAllTypes(levelPath);
@@ -657,7 +658,7 @@ public class MRocksDBManager implements IMetaManager {
 
         // Delete measurement node
         String mLevelPath = RocksDBUtils.getLevelPath(p.getNodes(), p.getNodeLength() - 1);
-        Lock lock = locksPool.get(mLevelPath);
+        Lock lock = locksPool.computeIfAbsent(mLevelPath, x -> new ReentrantLock());
         RMeasurementMNode deletedNode;
         if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
           try {
@@ -687,7 +688,7 @@ public class MRocksDBManager implements IMetaManager {
           PartialPath curPath = curNode.getPartialPath();
           String curLevelPath =
               RocksDBUtils.getLevelPath(curPath.getNodes(), curPath.getNodeLength() - 1);
-          Lock curLock = locksPool.get(curLevelPath);
+          Lock curLock = locksPool.computeIfAbsent(curLevelPath, x -> new ReentrantLock());
           if (curLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
             try {
               IMNode toDelete = curNode.getParent();
@@ -799,7 +800,7 @@ public class MRocksDBManager implements IMetaManager {
       int len = nodes.length;
       for (int i = 1; i < nodes.length; i++) {
         String levelKey = RocksDBUtils.getLevelPath(nodes, i);
-        Lock lock = locksPool.get(levelKey);
+        Lock lock = locksPool.computeIfAbsent(levelKey, x -> new ReentrantLock());
         if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
           try {
             CheckKeyResult keyCheckResult = readWriteHandler.keyExistByAllTypes(levelKey);
@@ -886,7 +887,7 @@ public class MRocksDBManager implements IMetaManager {
     byte[] pathKey = RocksDBUtils.toStorageNodeKey(levelPath);
     Holder<byte[]> holder = new Holder<>();
     try {
-      Lock lock = locksPool.get(levelPath);
+      Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
       if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
         try {
           if (readWriteHandler.keyExist(pathKey, holder)) {
@@ -1862,7 +1863,7 @@ public class MRocksDBManager implements IMetaManager {
     String levelPath = RocksDBUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
     byte[] originKey = RocksDBUtils.toMeasurementNodeKey(levelPath);
     try {
-      Lock rawKeyLock = locksPool.get(levelPath);
+      Lock rawKeyLock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
       if (rawKeyLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
         try {
           String[] nodes = path.getNodes();
@@ -1875,7 +1876,7 @@ public class MRocksDBManager implements IMetaManager {
             String newAliasLevel = RocksDBUtils.getLevelPath(newAlias, newAlias.length - 1);
             byte[] newAliasKey = RocksDBUtils.toAliasNodeKey(newAliasLevel);
 
-            Lock newAliasLock = locksPool.get(newAliasLevel);
+            Lock newAliasLock = locksPool.computeIfAbsent(newAliasLevel, x -> new ReentrantLock());
             Lock oldAliasLock = null;
             boolean lockedOldAlias = false;
             try {
@@ -1893,7 +1894,7 @@ public class MRocksDBManager implements IMetaManager {
                 oldAlias[nodes.length - 1] = mNode.getAlias();
                 String oldAliasLevel = RocksDBUtils.getLevelPath(oldAlias, oldAlias.length - 1);
                 byte[] oldAliasKey = RocksDBUtils.toAliasNodeKey(oldAliasLevel);
-                oldAliasLock = locksPool.get(oldAliasLevel);
+                oldAliasLock = locksPool.computeIfAbsent(oldAliasLevel, x -> new ReentrantLock());
                 if (oldAliasLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
                   lockedOldAlias = true;
                   if (!readWriteHandler.keyExist(oldAliasKey)) {
@@ -1962,7 +1963,7 @@ public class MRocksDBManager implements IMetaManager {
     byte[] key = RocksDBUtils.toMeasurementNodeKey(levelPath);
     Holder<byte[]> holder = new Holder<>();
     try {
-      Lock lock = locksPool.get(levelPath);
+      Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
       if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
         try {
           if (!readWriteHandler.keyExist(key, holder)) {
@@ -2004,7 +2005,7 @@ public class MRocksDBManager implements IMetaManager {
     byte[] key = RocksDBUtils.toMeasurementNodeKey(levelPath);
     Holder<byte[]> holder = new Holder<>();
     try {
-      Lock lock = locksPool.get(levelPath);
+      Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
       if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
         try {
           if (!readWriteHandler.keyExist(key, holder)) {
@@ -2052,7 +2053,7 @@ public class MRocksDBManager implements IMetaManager {
     byte[] key = RocksDBUtils.toMeasurementNodeKey(levelPath);
     Holder<byte[]> holder = new Holder<>();
     try {
-      Lock lock = locksPool.get(levelPath);
+      Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
       if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
         try {
           if (!readWriteHandler.keyExist(key, holder)) {
@@ -2108,7 +2109,7 @@ public class MRocksDBManager implements IMetaManager {
     byte[] key = RocksDBUtils.toMeasurementNodeKey(levelPath);
     Holder<byte[]> holder = new Holder<>();
     try {
-      Lock lock = locksPool.get(levelPath);
+      Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
       if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
         try {
           byte[] originValue = holder.getValue();
@@ -2159,7 +2160,7 @@ public class MRocksDBManager implements IMetaManager {
     byte[] nodeKey = RocksDBUtils.toMeasurementNodeKey(levelPath);
     Holder<byte[]> holder = new Holder<>();
     try {
-      Lock lock = locksPool.get(levelPath);
+      Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
       if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
         try {
           if (!readWriteHandler.keyExist(nodeKey, holder)) {

[iotdb] 19/45: fix incorrect storagegroup name in show timeseries

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 311ad3619b9f29afd7c79d7ba4939e1be18573a3
Author: lisijia <li...@360.cn>
AuthorDate: Wed Mar 9 19:11:45 2022 +0800

    fix incorrect storagegroup name in show timeseries
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 46 ++++++++++++++++++----
 .../iotdb/db/metadata/rocksdb/RocksDBUtils.java    |  8 ++++
 2 files changed, 47 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 3161903..7ae7e50 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -86,6 +86,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
 
 import com.google.common.collect.MapMaker;
+import io.netty.util.internal.StringUtil;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.rocksdb.Holder;
@@ -814,11 +815,8 @@ public class MRocksDBManager implements IMetaManager {
               }
             } else {
               boolean hasChild = !keyCheckResult.getResult(RocksDBMNodeType.STORAGE_GROUP);
-              StringBuilder stringBuilder = new StringBuilder();
-              for (int j = 0; j <= i; j++) {
-                stringBuilder.append(RockDBConstants.PATH_SEPARATOR).append(nodes[j]);
-              }
-              throw new StorageGroupAlreadySetException(stringBuilder.substring(1), hasChild);
+              throw new StorageGroupAlreadySetException(
+                  RocksDBUtils.concatNodesName(nodes, 0, i), hasChild);
             }
           } finally {
             lock.unlock();
@@ -1534,13 +1532,42 @@ public class MRocksDBManager implements IMetaManager {
     BiFunction<byte[], byte[], Boolean> function =
         (a, b) -> {
           String fullPath = RocksDBUtils.getPathByInnerName(new String(a));
-          res.add(new ShowDevicesResult(fullPath, RocksDBUtils.isAligned(b)));
+          try {
+            res.add(
+                new ShowDevicesResult(
+                    fullPath,
+                    RocksDBUtils.isAligned(b),
+                    getBelongedToSG(plan.getPath().getNodes())));
+          } catch (MetadataException e) {
+            logger.error(e.getMessage());
+            return false;
+          }
           return true;
         };
     traverseOutcomeBasins(
         plan.getPath().getNodes(), MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_ENTITY});
     return res;
   }
+
+  private String getBelongedToSG(String[] nodes) throws MetadataException {
+    List<String> contextNodeName = new ArrayList<>();
+    for (int idx = 1; idx < nodes.length; idx++) {
+      contextNodeName.add(nodes[idx]);
+      String innerName =
+          RocksDBUtils.convertPartialPathToInnerByNodes(
+              contextNodeName.toArray(new String[0]), contextNodeName.size(), NODE_TYPE_SG);
+      byte[] queryResult;
+      try {
+        queryResult = readWriteHandler.get(null, innerName.getBytes());
+      } catch (RocksDBException e) {
+        throw new MetadataException(e);
+      }
+      if (queryResult != null) {
+        return RocksDBUtils.concatNodesName(nodes, 0, idx);
+      }
+    }
+    return StringUtil.EMPTY_STRING;
+  }
   // endregion
 
   // region Interfaces for timeseries, measurement and schema info Query
@@ -1658,7 +1685,12 @@ public class MRocksDBManager implements IMetaManager {
               // todo need update these properties
               tsRow[0] = measurementPath.getMeasurementAlias();
               // sg name
-              tsRow[1] = measurementPath.getFullPath();
+              try {
+                tsRow[1] = getBelongedToSG(measurementPath.getNodes());
+              } catch (MetadataException e) {
+                logger.error(e.getMessage());
+                tsRow[1] = StringUtil.EMPTY_STRING;
+              }
               tsRow[2] = measurementPath.getMeasurementSchema().getType().toString();
               tsRow[3] = measurementPath.getMeasurementSchema().getEncodingType().toString();
               tsRow[4] = measurementPath.getMeasurementSchema().getCompressor().toString();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index ee0c29b..8ac3467 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -555,4 +555,12 @@ public class RocksDBUtils {
     }
     return allNodesArray;
   }
+
+  public static String concatNodesName(String[] nodes, int startIdx, int endIdx) {
+    StringBuilder stringBuilder = new StringBuilder();
+    for (int i = startIdx; i <= endIdx; i++) {
+      stringBuilder.append(PATH_SEPARATOR).append(nodes[i]);
+    }
+    return stringBuilder.substring(1);
+  }
 }

[iotdb] 39/45: delete unused method

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 95d746c3a56bb2a50662a2e19ed5bf46eaf4bd07
Author: lisijia <li...@360.cn>
AuthorDate: Tue Mar 15 10:43:56 2022 +0800

    delete unused method
---
 .../iotdb/db/metadata/rocksdb/RocksDBUtils.java    | 36 +++-------------------
 1 file changed, 4 insertions(+), 32 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index 680696c..a46a55f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -26,11 +26,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import com.google.common.primitives.Bytes;
 import org.apache.commons.lang3.ArrayUtils;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +37,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
@@ -86,12 +83,6 @@ public class RocksDBUtils {
     NODE_TYPE_ARRAY[NODE_TYPE_ALIAS] = RocksDBMNodeType.ALISA;
   }
 
-  protected static byte[] constructDataBlock(byte type, String data) {
-    byte[] dataInBytes = data.getBytes();
-    int size = dataInBytes.length;
-    return Bytes.concat(new byte[] {type}, BytesUtils.intToBytes(size), dataInBytes);
-  }
-
   protected static byte[] toInternalNodeKey(String levelPath) {
     return toRocksDBKey(levelPath, NODE_TYPE_INTERNAL);
   }
@@ -436,7 +427,9 @@ public class RocksDBUtils {
   }
 
   public static String[] toMetaNodes(byte[] rocksdbKey) {
-    String rawKey = new String(BytesUtils.subBytes(rocksdbKey, 1, rocksdbKey.length - 1));
+    String rawKey =
+        new String(
+            Objects.requireNonNull(BytesUtils.subBytes(rocksdbKey, 1, rocksdbKey.length - 1)));
     String[] nodes = rawKey.split(ESCAPE_PATH_SEPARATOR);
     nodes[0] = ROOT_STRING;
     for (int i = 1; i < nodes.length; i++) {
@@ -445,27 +438,6 @@ public class RocksDBUtils {
     return nodes;
   }
 
-  /**
-   * Statistics the number of all data entries for a specified column family
-   *
-   * @param rocksDB rocksdb
-   * @param columnFamilyHandle specified column family handle
-   * @return total number in this column family
-   */
-  public static long countNodesNum(RocksDB rocksDB, ColumnFamilyHandle columnFamilyHandle) {
-    RocksIterator iter;
-    if (columnFamilyHandle == null) {
-      iter = rocksDB.newIterator();
-    } else {
-      iter = rocksDB.newIterator(columnFamilyHandle);
-    }
-    long count = 0;
-    for (iter.seekToFirst(); iter.isValid(); iter.next()) {
-      count++;
-    }
-    return count;
-  }
-
   public static String getPathByInnerName(String innerName) {
     char[] keyConvertToCharArray = innerName.toCharArray();
     StringBuilder stringBuilder = new StringBuilder();

[iotdb] 43/45: print exception info

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1599164190a7b59e329a38ae163d3f113ab870e1
Author: lisijia <li...@360.cn>
AuthorDate: Wed Mar 16 17:29:34 2022 +0800

    print exception info
---
 .../java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java   | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index f623398..95f23e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -1167,6 +1167,11 @@ public class MRocksDBManager implements IMetaManager {
   @Override
   public int getNodesCountInGivenLevel(PartialPath pathPattern, int level, boolean isPrefixMatch)
       throws MetadataException {
+    // todo support wildcard
+    if (pathPattern.getFullPath().contains(ONE_LEVEL_PATH_WILDCARD)) {
+      throw new MetadataException(
+          "Wildcards are not currently supported for this operation [COUNT NODES pathPattern].");
+    }
     String innerNameByLevel =
         RocksDBUtils.getLevelPath(pathPattern.getNodes(), pathPattern.getNodeLength() - 1, level);
     for (RocksDBMNodeType type : RocksDBMNodeType.values()) {

[iotdb] 42/45: Merge branch 'rocksdb/dev' of v.src.corp.qihoo.net:xt_hadoop/iotdb into rocksdb/dev

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3cbf605d7f7d76d7c34c9487a8a5ae6e6587bf3d
Merge: 4a39a9e ccd5604
Author: lisijia <li...@360.cn>
AuthorDate: Wed Mar 16 16:45:48 2022 +0800

    Merge branch 'rocksdb/dev' of v.src.corp.qihoo.net:xt_hadoop/iotdb into rocksdb/dev

 .../java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)


[iotdb] 40/45: Fix the bug that path in PathNotExistException is incomplete when do insert

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ccd5604758db72fd45fef9e26f59e40f08efa595
Author: chengjianyun <ch...@360.cn>
AuthorDate: Mon Mar 14 14:49:48 2022 +0800

    Fix the bug that path in PathNotExistException is incomplete when do insert
---
 .../java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index bf6ccfa..259b073 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -2281,6 +2281,9 @@ public class MRocksDBManager implements IMetaManager {
       PartialPath path = new PartialPath(devicePath.getFullPath(), measurementList[i]);
       IMeasurementMNode node = getMeasurementMNode(path);
       if (node == null) {
+        if (!config.isAutoCreateSchemaEnabled()) {
+          throw new PathNotExistException(path.getFullPath());
+        }
         missingNodeIndex.put(i, path);
       } else {
         nodeMap.put(i, node);
@@ -2289,10 +2292,6 @@ public class MRocksDBManager implements IMetaManager {
 
     // create missing nodes
     if (!missingNodeIndex.isEmpty()) {
-      if (!config.isAutoCreateSchemaEnabled()) {
-        throw new PathNotExistException(devicePath + PATH_SEPARATOR);
-      }
-
       if (!(plan instanceof InsertRowPlan) && !(plan instanceof InsertTabletPlan)) {
         throw new MetadataException(
             String.format(

[iotdb] 30/45: update exception info

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 01736f14870a8411914b0147033f8c034dd4cf34
Author: lisijia <li...@360.cn>
AuthorDate: Fri Mar 11 10:48:34 2022 +0800

    update exception info
---
 .../java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 19c972d..8139977 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -618,15 +618,15 @@ public class MRocksDBManager implements IMetaManager {
             }
 
             if (!checkResult.getResult(RocksDBMNodeType.ENTITY)) {
-              throw new PathAlreadyExistException("Node already exists but not entity");
+              throw new MetadataException("Node already exists but not entity");
             }
 
             if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
-              throw new PathAlreadyExistException("Entity node exists but not aligned");
+              throw new MetadataException("Entity node exists but not aligned");
             }
           } else if (checkResult.getResult(RocksDBMNodeType.MEASUREMENT)
               || checkResult.getResult(RocksDBMNodeType.ALISA)) {
-            throw new PathAlreadyExistException("Path contains measurement node");
+            throw new MetadataException("Path contains measurement node");
           }
         }
       } catch (Exception e) {

[iotdb] 45/45: fix bug where storage groups were displayed incorrectly by show devices

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1589801cc24195c296406112be904eefd58c6e12
Author: lisijia <li...@360.cn>
AuthorDate: Thu Mar 17 12:10:02 2022 +0800

    fix bug where storage groups were displayed incorrectly by show devices
---
 .../main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 459e924..f3c3da2 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -1516,7 +1516,7 @@ public class MRocksDBManager implements IMetaManager {
                 new ShowDevicesResult(
                     fullPath,
                     RocksDBUtils.isAligned(b),
-                    getBelongedToSG(plan.getPath().getNodes())));
+                    getBelongedToSG(MetaUtils.splitPathToDetachedPath(fullPath))));
           } catch (MetadataException e) {
             logger.error(e.getMessage());
             return false;

[iotdb] 20/45: [RcoksDB] refine delete timeseries logic

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c1d4303705ea01759f6e66e2a404b45ac94c2145
Author: chengjianyun <ch...@360.cn>
AuthorDate: Wed Mar 9 14:25:30 2022 +0800

    [RcoksDB] refine delete timeseries logic
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 159 ++++++++++-----------
 .../iotdb/db/metadata/rocksdb/RocksDBUtils.java    |  31 +---
 2 files changed, 82 insertions(+), 108 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 7ae7e50..3c8523f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.metadata.rocksdb;
 
 import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
 import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
@@ -122,17 +121,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_NODE_VALUE;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_SCHEMA;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
 /**
@@ -641,89 +630,93 @@ public class MRocksDBManager implements IMetaManager {
   @Override
   public String deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
-    List<MeasurementPath> allTimeseries = getMeasurementPaths(pathPattern, isPrefixMatch);
-    if (allTimeseries.isEmpty()) {
-      // In the cluster mode, the deletion of a timeseries will be forwarded to all the nodes. For
-      // nodes that do not have the metadata of the timeseries, the coordinator expects a
-      // PathNotExistException.
-      throw new PathNotExistException(pathPattern.getFullPath());
-    }
-
     Set<String> failedNames = new HashSet<>();
-    for (PartialPath p : allTimeseries) {
-      try {
-        String[] nodes = p.getNodes();
-        if (nodes.length == 0 || !IoTDBConstant.PATH_ROOT.equals(nodes[0])) {
-          throw new IllegalPathException(p.getFullPath());
-        }
-
-        // Delete measurement node
-        String mLevelPath = RocksDBUtils.getLevelPath(p.getNodes(), p.getNodeLength() - 1);
-        Lock lock = locksPool.computeIfAbsent(mLevelPath, x -> new ReentrantLock());
-        RMeasurementMNode deletedNode;
-        if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+    traverseOutcomeBasins(
+        pathPattern.getNodes(),
+        MAX_PATH_DEPTH,
+        (key, value) -> {
+          String path = null;
+          RMeasurementMNode deletedNode = null;
           try {
-            deletedNode = (RMeasurementMNode) getMeasurementMNode(p);
-            WriteBatch batch = new WriteBatch();
-            // delete the last node of path
-            byte[] mNodeKey = RocksDBUtils.toMeasurementNodeKey(mLevelPath);
-            batch.delete(mNodeKey);
-            if (deletedNode.getAlias() != null) {
-              batch.delete(RocksDBUtils.toAliasNodeKey(mLevelPath));
-            }
-            if (deletedNode.getTags() != null && !deletedNode.getTags().isEmpty()) {
-              batch.delete(readWriteHandler.getCFHByName(TABLE_NAME_TAGS), mNodeKey);
-              // TODO: tags invert index update
+            path = RocksDBUtils.getPathByInnerName(key);
+            PartialPath partialPath = new PartialPath(path);
+            String levelPath =
+                RocksDBUtils.getLevelPath(partialPath.getNodes(), partialPath.getNodeLength() - 1);
+            // Delete measurement node
+            Lock lock = locksPool.get(levelPath);
+        Lock lock = locksPool.computeIfAbsent(mLevelPath, x -> new ReentrantLock());
+            if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+              try {
+                deletedNode = new RMeasurementMNode(path, value);
+                WriteBatch batch = new WriteBatch();
+                // delete the last node of path
+                batch.delete(key);
+                if (deletedNode.getAlias() != null) {
+                  String[] aliasNodes =
+                      Arrays.copyOf(partialPath.getNodes(), partialPath.getNodeLength());
+                  aliasNodes[partialPath.getNodeLength() - 1] = deletedNode.getAlias();
+                  String aliasLevelPath =
+                      RocksDBUtils.getLevelPath(aliasNodes, aliasNodes.length - 1);
+                  batch.delete(RocksDBUtils.toAliasNodeKey(aliasLevelPath));
+                }
+                if (deletedNode.getTags() != null && !deletedNode.getTags().isEmpty()) {
+                  batch.delete(readWriteHandler.getCFHByName(TABLE_NAME_TAGS), key);
+                  // TODO: tags invert index update
+                }
+                readWriteHandler.executeBatch(batch);
+              } finally {
+                lock.unlock();
+              }
+            } else {
+              throw new AcquireLockTimeoutException("acquire lock timeout, " + path);
             }
-            readWriteHandler.executeBatch(batch);
-          } finally {
-            lock.unlock();
+          } catch (Exception e) {
+            logger.error("delete timeseries [{}] fail", path, e);
+            failedNames.add(path);
           }
-        } else {
-          throw new AcquireLockTimeoutException("acquire lock timeout, " + p.getFullPath());
-        }
 
-        // delete parent node if is empty
-        IMNode curNode = deletedNode;
-        while (curNode != null) {
-          PartialPath curPath = curNode.getPartialPath();
-          String curLevelPath =
-              RocksDBUtils.getLevelPath(curPath.getNodes(), curPath.getNodeLength() - 1);
+          // delete parent node if is empty
+          IMNode parentNode = deletedNode.getParent();
           Lock curLock = locksPool.computeIfAbsent(curLevelPath, x -> new ReentrantLock());
-          if (curLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
-            try {
-              IMNode toDelete = curNode.getParent();
-              if (toDelete == null || toDelete.isStorageGroup()) {
+          try {
+            while (parentNode != null) {
+              // TODO: check children size
+              if (!parentNode.isEmptyInternal() || parentNode.isStorageGroup()) {
                 break;
-              }
-
-              if (toDelete.isEmptyInternal()) {
-                if (toDelete.isEntity()) {
-                  // TODO: aligned timeseries needs special check????
-                  readWriteHandler.deleteNode(
-                      toDelete.getPartialPath().getNodes(), RocksDBMNodeType.ENTITY);
+              } else {
+                PartialPath parentPath = parentNode.getPartialPath();
+                String parentLevelPath =
+                    RocksDBUtils.getLevelPath(
+                        parentPath.getNodes(), parentPath.getNodeLength() - 1);
+                Lock curLock = locksPool.get(parentLevelPath);
+                if (curLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+                  try {
+                    if (parentNode.isEntity()) {
+                      // TODO: aligned timeseries needs special check????
+                      readWriteHandler.deleteNode(
+                          parentNode.getPartialPath().getNodes(), RocksDBMNodeType.ENTITY);
+                    } else {
+                      readWriteHandler.deleteNode(
+                          parentNode.getPartialPath().getNodes(), RocksDBMNodeType.INTERNAL);
+                    }
+                    parentNode = parentNode.getParent();
+                  } finally {
+                    curLock.unlock();
+                  }
                 } else {
-                  readWriteHandler.deleteNode(
-                      toDelete.getPartialPath().getNodes(), RocksDBMNodeType.INTERNAL);
+                  throw new AcquireLockTimeoutException("acquire lock timeout, " + parentLevelPath);
                 }
-                curNode = toDelete;
-              } else {
-                break;
               }
-            } finally {
-              curLock.unlock();
             }
-          } else {
-            throw new AcquireLockTimeoutException("acquire lock timeout, " + curNode.getFullPath());
+            // TODO: trigger engine update
+            // TODO: update totalTimeSeriesNumber
+          } catch (Exception e) {
+            logger.error("delete timeseries [{}] fail", parentNode.getFullPath(), e);
+            failedNames.add(parentNode.getFullPath());
           }
-        }
-        // TODO: trigger engine update
-        // TODO: update totalTimeSeriesNumber
-      } catch (Exception e) {
-        logger.error("delete timeseries [{}] fail", p.getFullPath(), e);
-        failedNames.add(p.getFullPath());
-      }
-    }
+          return true;
+        },
+        new Character[] {NODE_TYPE_MEASUREMENT});
     return failedNames.isEmpty() ? null : String.join(",", failedNames);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index 8ac3467..a1d37cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -43,33 +43,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_ROOT;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ALIAS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ATTRIBUTES;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ORIGIN_KEY;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_TAGS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_TTL;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_VERSION;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_FLAG;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ESCAPE_PATH_SEPARATOR;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_ALIAS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_ATTRIBUTES;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_TAGS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.*;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ALIAS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_INTERNAL;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.PATH_SEPARATOR;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ROOT;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ROOT_CHAR;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ROOT_STRING;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
 
 public class RocksDBUtils {
 
@@ -198,6 +175,10 @@ public class RocksDBUtils {
       flag = (byte) (flag | FLAG_HAS_ATTRIBUTES);
     }
 
+    if (schema != null) {
+      flag = (byte) (flag | FLAG_IS_SCHEMA);
+    }
+
     ReadWriteIOUtils.write(flag, outputStream);
 
     if (schema != null) {

[iotdb] 01/45: use wildcard to query

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5d015a1aca975d239293b3b1003bf0d96fd07b97
Author: lisijia <li...@360.cn>
AuthorDate: Wed Mar 2 10:49:33 2022 +0800

    use wildcard to query
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 84 +++++++++++++++++++---
 .../iotdb/db/metadata/rocksdb/RocksDBUtils.java    | 41 +----------
 .../db/metadata/rocksdb/MRocksDBUnitTest.java      |  2 +-
 3 files changed, 80 insertions(+), 47 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 743e34a..37821ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -109,12 +109,27 @@ import java.util.Stack;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_NODE_VALUE;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_SCHEMA;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
+import static org.apache.iotdb.db.metadata.rocksdb.RocksDBUtils.getAllCompoundMode;
+import static org.apache.iotdb.db.metadata.rocksdb.RocksDBUtils.newStringArray;
+import static org.apache.iotdb.db.metadata.rocksdb.RocksDBUtils.replaceWildcard;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
 /**
@@ -940,19 +955,57 @@ public class MRocksDBManager implements IMetaManager {
   @Override
   public int getAllTimeseriesCount(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
+    return getCountByNodeType(new Character[] {NODE_TYPE_MEASUREMENT}, pathPattern.getNodes());
+  }
 
-    return getKeyNumByPrefix(pathPattern, NODE_TYPE_MEASUREMENT, isPrefixMatch).size();
+  // eg. root.a.*.**.b.**.c
+  public void replaceMultiWildcard(
+      String[] nodes, int maxLevel, Consumer<String> consumer, Character[] nodeTypeArray)
+      throws IllegalPathException {
+    List<Integer> multiWildcardPosition = new ArrayList<>();
+    for (int i = 0; i < nodes.length; i++) {
+      if (MULTI_LEVEL_PATH_WILDCARD.equals(nodes[i])) {
+        multiWildcardPosition.add(i);
+      }
+    }
+    if (multiWildcardPosition.isEmpty()) {
+      traverseByPatternPath(nodes, consumer, nodeTypeArray);
+    } else if (multiWildcardPosition.size() == 1) {
+      for (int i = 1; i <= maxLevel - nodes.length + 2; i++) {
+        String[] clone = nodes.clone();
+        clone[multiWildcardPosition.get(0)] = replaceWildcard(i);
+        traverseByPatternPath(newStringArray(clone), consumer, nodeTypeArray);
+      }
+    } else {
+      for (int sum = multiWildcardPosition.size();
+          sum <= maxLevel - (nodes.length - multiWildcardPosition.size() - 1);
+          sum++) {
+        List<int[]> result = getAllCompoundMode(sum, multiWildcardPosition.size());
+        for (int[] value : result) {
+          String[] clone = nodes.clone();
+          for (int i = 0; i < value.length; i++) {
+            clone[multiWildcardPosition.get(i)] = replaceWildcard(value[i]);
+          }
+          traverseByPatternPath(newStringArray(clone), consumer, nodeTypeArray);
+        }
+      }
+    }
   }
 
-  public void traverseByPatternPath(PartialPath pathPattern) {
-    String[] nodes = pathPattern.getNodes();
+  public void traverseByPatternPath(
+      String[] nodes, Consumer<String> consumer, Character[] nodeTypeArray) {
+    //    String[] nodes = pathPattern.getNodes();
 
     int startIndex = 0;
     List<String[]> scanKeys = new ArrayList<>();
 
     int indexOfPrefix = indexOfFirstWildcard(nodes, startIndex);
     if (indexOfPrefix >= nodes.length) {
-      System.out.println("matched key: " + pathPattern.getFullPath());
+      StringBuilder stringBuilder = new StringBuilder();
+      for (int i = 0; i < nodes.length; i++) {
+        stringBuilder.append(RockDBConstants.PATH_SEPARATOR).append(nodes[i]);
+      }
+      consumer.accept(stringBuilder.substring(1));
       return;
     }
 
@@ -979,7 +1032,7 @@ public class MRocksDBManager implements IMetaManager {
               prefixNodes -> {
                 String levelPrefix =
                     RocksDBUtils.getLevelPathPrefix(prefixNodes, prefixNodes.length - 1, level);
-                Arrays.stream(ALL_NODE_TYPE_ARRAY)
+                Arrays.stream(nodeTypeArray)
                     .parallel()
                     .forEach(
                         x -> {
@@ -993,6 +1046,7 @@ public class MRocksDBManager implements IMetaManager {
                             if (RocksDBUtils.suffixMatch(iterator.key(), suffixToMatch)) {
                               if (lastIteration) {
                                 System.out.println("matched key: " + new String(iterator.key()));
+                                consumer.accept(new String(iterator.key()));
                               } else {
                                 tempNodes.add(RocksDBUtils.toMetaNodes(iterator.key()));
                               }
@@ -1120,7 +1174,21 @@ public class MRocksDBManager implements IMetaManager {
   @Override
   public int getDevicesNum(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
-    return getKeyNumByPrefix(pathPattern, NODE_TYPE_ENTITY, isPrefixMatch).size();
+    return getCountByNodeType(new Character[] {NODE_TYPE_ENTITY}, pathPattern.getNodes());
+  }
+
+  private int getCountByNodeType(Character[] nodetype, String[] nodes) throws IllegalPathException {
+    AtomicInteger atomicInteger = new AtomicInteger(0);
+    Consumer<String> consumer =
+        new Consumer<String>() {
+          @Override
+          public void accept(String s) {
+            atomicInteger.incrementAndGet();
+          }
+        };
+
+    replaceMultiWildcard(nodes, MAX_PATH_DEPTH, consumer, nodetype);
+    return atomicInteger.get();
   }
 
   @Override
@@ -1136,7 +1204,7 @@ public class MRocksDBManager implements IMetaManager {
   @Override
   public int getStorageGroupNum(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
-    return getKeyNumByPrefix(pathPattern, NODE_TYPE_SG, isPrefixMatch).size();
+    return getCountByNodeType(new Character[] {NODE_TYPE_SG}, pathPattern.getNodes());
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index d3b585c..7cf4b76 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -43,7 +43,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_ROOT;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ALIAS;
@@ -472,41 +471,7 @@ public class RocksDBUtils {
     return stringBuilder.toString();
   }
 
-  // eg. root.a.*.**.b.**.c
-  public static List<String[]> replaceMultiWildcard(String[] nodes, int maxLevel)
-      throws IllegalPathException {
-    List<String[]> allResult = new ArrayList<>();
-    List<Integer> multiWildcardPosition = new ArrayList<>();
-    for (int i = 0; i < nodes.length; i++) {
-      if (MULTI_LEVEL_PATH_WILDCARD.equals(nodes[i])) {
-        multiWildcardPosition.add(i);
-      }
-    }
-    if (multiWildcardPosition.isEmpty()) {
-      allResult.add(nodes);
-    } else if (multiWildcardPosition.size() == 1) {
-      for (int i = 1; i <= maxLevel - nodes.length + 2; i++) {
-        String[] clone = nodes.clone();
-        clone[multiWildcardPosition.get(0)] = replaceWildcard(i);
-        allResult.add(newStringArray(clone));
-      }
-    } else {
-      List<int[]> result =
-          getAllCompoundMode(
-              maxLevel - (nodes.length - multiWildcardPosition.size() - 1),
-              multiWildcardPosition.size());
-      for (int[] value : result) {
-        String[] clone = nodes.clone();
-        for (int i = 0; i < value.length; i++) {
-          clone[multiWildcardPosition.get(i)] = replaceWildcard(value[i]);
-        }
-        allResult.add(newStringArray(clone));
-      }
-    }
-    return allResult;
-  }
-
-  private static String[] newStringArray(String[] oldArray) throws IllegalPathException {
+  public static String[] newStringArray(String[] oldArray) throws IllegalPathException {
     StringBuilder stringBuilder = new StringBuilder();
     for (String str : oldArray) {
       stringBuilder.append(PATH_SEPARATOR).append(str);
@@ -514,7 +479,7 @@ public class RocksDBUtils {
     return MetaUtils.splitPathToDetachedPath(stringBuilder.substring(1));
   }
 
-  private static String replaceWildcard(int num) {
+  public static String replaceWildcard(int num) {
     StringBuilder stringBuilder = new StringBuilder();
     for (int i = 0; i < num; i++) {
       stringBuilder.append(RockDBConstants.PATH_SEPARATOR).append(ONE_LEVEL_PATH_WILDCARD);
@@ -522,7 +487,7 @@ public class RocksDBUtils {
     return stringBuilder.substring(1);
   }
 
-  private static List<int[]> getAllCompoundMode(int sum, int n) {
+  public static List<int[]> getAllCompoundMode(int sum, int n) {
     if (n <= 2) {
       List<int[]> result = new ArrayList<>();
       for (int i = 1; i < sum; i++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java
index df6cfd2..a0ce63f 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBUnitTest.java
@@ -216,7 +216,7 @@ public class MRocksDBUnitTest {
           path, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, null);
     }
 
-    mRocksDBManager.traverseByPatternPath(new PartialPath("root.sg.d1.*"));
+    //    mRocksDBManager.traverseByPatternPath(new PartialPath("root.sg.d1.*"));
   }
 
   @After

[iotdb] 22/45: spotless apply

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 61d9eddd3d2c1e70c8a3c858cfefe46896aa3d35
Author: chengjianyun <ch...@360.cn>
AuthorDate: Thu Mar 10 10:04:11 2022 +0800

    spotless apply
---
 .../main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index 4aed3a7..38906c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -44,9 +44,9 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.PATH_SEPARATOR;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
 
 public class RocksDBUtils {
 

[iotdb] 08/45: [rocksdb] complete data transfer

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 06fe60e147bee4666fbc67a56d9be1f5de2def3a
Author: chengjianyun <ch...@360.cn>
AuthorDate: Fri Mar 4 15:43:56 2022 +0800

    [rocksdb] complete data transfer
---
 .../resources/conf/iotdb-engine.properties         |   8 +
 .../assembly/resources/tools/rocksdb-transfer.bat  | 126 +++++++++
 .../assembly/resources/tools/rocksdb-transfer.sh   |  82 ++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  11 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../iotdb/db/metadata/MetadataManagerType.java     |  14 +
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java |   1 -
 .../db/metadata/rocksdb/MetaDataTransfer.java      | 283 +++++++++++++--------
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  10 +-
 9 files changed, 426 insertions(+), 114 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 7150de3..fe6d101 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -515,6 +515,14 @@ timestamp_precision=ms
 # enable_id_table_log_file=false
 
 ####################
+### Metadata Configuration
+####################
+
+# Which metadata manager to be used, right now MEMORY_MANAGER and ROCKSDB_MANAGER are supported. Default MEMORY_MANAGER.
+# Datatype: string
+# meta_data_manager=MEMORY_MANAGER
+
+####################
 ### Metadata Cache Configuration
 ####################
 
diff --git a/server/src/assembly/resources/tools/rocksdb-transfer.bat b/server/src/assembly/resources/tools/rocksdb-transfer.bat
new file mode 100644
index 0000000..8fa5433
--- /dev/null
+++ b/server/src/assembly/resources/tools/rocksdb-transfer.bat
@@ -0,0 +1,126 @@
+@REM
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements.  See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership.  The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License.  You may obtain a copy of the License at
+@REM
+@REM     http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied.  See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM
+
+@echo off
+echo ````````````````````````
+echo Starting IoTDB
+echo ````````````````````````
+
+
+set PATH="%JAVA_HOME%\bin\";%PATH%
+set "FULL_VERSION="
+set "MAJOR_VERSION="
+set "MINOR_VERSION="
+
+
+for /f tokens^=2-5^ delims^=.-_+^" %%j in ('java -fullversion 2^>^&1') do (
+	set "FULL_VERSION=%%j-%%k-%%l-%%m"
+	IF "%%j" == "1" (
+	    set "MAJOR_VERSION=%%k"
+	    set "MINOR_VERSION=%%l"
+	) else (
+	    set "MAJOR_VERSION=%%j"
+	    set "MINOR_VERSION=%%k"
+	)
+)
+
+set JAVA_VERSION=%MAJOR_VERSION%
+
+@REM we do not check jdk that version less than 1.8 because they are too stale...
+IF "%JAVA_VERSION%" == "6" (
+	echo IoTDB only supports jdk >= 8, please check your java version.
+	goto finally
+)
+IF "%JAVA_VERSION%" == "7" (
+	echo IoTDB only supports jdk >= 8, please check your java version.
+	goto finally
+)
+
+if "%OS%" == "Windows_NT" setlocal
+
+pushd %~dp0..
+if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%cd%
+popd
+
+set IOTDB_CONF=%IOTDB_HOME%\conf
+set IOTDB_LOGS=%IOTDB_HOME%\logs
+
+@setlocal ENABLEDELAYEDEXPANSION ENABLEEXTENSIONS
+set is_conf_path=false
+for %%i in (%*) do (
+	IF "%%i" == "-c" (
+		set is_conf_path=true
+	) ELSE IF "!is_conf_path!" == "true" (
+		set is_conf_path=false
+		set IOTDB_CONF=%%i
+	) ELSE (
+		set CONF_PARAMS=!CONF_PARAMS! %%i
+	)
+)
+
+IF EXIST "%IOTDB_CONF%\iotdb-env.bat" (
+    CALL "%IOTDB_CONF%\iotdb-env.bat" %1
+    ) ELSE (
+    echo "can't find %IOTDB_CONF%\iotdb-env.bat"
+    )
+
+if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.metadata.rocksdb.MetaDataTransfer
+if NOT DEFINED JAVA_HOME goto :err
+
+@REM -----------------------------------------------------------------------------
+@REM JVM Opts we'll use in legacy run or installation
+set JAVA_OPTS=-ea^
+ -Dlogback.configurationFile="%IOTDB_CONF%\logback.xml"^
+ -DIOTDB_HOME="%IOTDB_HOME%"^
+ -DTSFILE_HOME="%IOTDB_HOME%"^
+ -DTSFILE_CONF="%IOTDB_CONF%"^
+ -DIOTDB_CONF="%IOTDB_CONF%"^
+ -Dsun.jnu.encoding=UTF-8^
+ -Dfile.encoding=UTF-8
+
+@REM ***** CLASSPATH library setting *****
+@REM Ensure that any user defined CLASSPATH variables are not used on startup
+set CLASSPATH="%IOTDB_HOME%\lib\*"
+set CLASSPATH=%CLASSPATH%;iotdb.IoTDB
+goto okClasspath
+
+:append
+set CLASSPATH=%CLASSPATH%;%1
+
+goto :eof
+
+@REM -----------------------------------------------------------------------------
+:okClasspath
+
+rem echo CLASSPATH: %CLASSPATH%
+
+"%JAVA_HOME%\bin\java" %JAVA_OPTS% %IOTDB_HEAP_OPTS% -cp %CLASSPATH% %IOTDB_JMX_OPTS% %MAIN_CLASS% %CONF_PARAMS%
+goto finally
+
+:err
+echo JAVA_HOME environment variable must be set!
+pause
+
+
+@REM -----------------------------------------------------------------------------
+:finally
+
+pause
+
+ENDLOCAL
diff --git a/server/src/assembly/resources/tools/rocksdb-transfer.sh b/server/src/assembly/resources/tools/rocksdb-transfer.sh
new file mode 100644
index 0000000..cb9040c
--- /dev/null
+++ b/server/src/assembly/resources/tools/rocksdb-transfer.sh
@@ -0,0 +1,82 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+echo ---------------------
+echo Starting IoTDB
+echo ---------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+  export IOTDB_HOME="`dirname "$0"`/.."
+fi
+
+IOTDB_CONF=${IOTDB_HOME}/conf
+# IOTDB_LOGS=${IOTDB_HOME}/logs
+
+is_conf_path=false
+for arg do
+  shift
+  if [ "$arg" == "-c" ]; then
+    is_conf_path=true
+    continue
+  fi
+  if [ $is_conf_path == true ]; then
+    IOTDB_CONF=$arg
+    is_conf_path=false
+    continue
+  fi
+  set -- "$@" "$arg"
+done
+
+CONF_PARAMS=$*
+
+if [ -f "$IOTDB_CONF/iotdb-env.sh" ]; then
+    if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
+      . "$IOTDB_CONF/iotdb-env.sh" "printgc"
+    else
+        . "$IOTDB_CONF/iotdb-env.sh"
+    fi
+else
+    echo "can't find $IOTDB_CONF/iotdb-env.sh"
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+  CLASSPATH=${CLASSPATH}":"$f
+done
+classname=org.apache.iotdb.db.metadata.rocksdb.MetaDataTransfer
+
+launch_service()
+{
+	class="$1"
+	iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback.xml"
+	iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}"
+	iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
+	iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
+	iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
+	iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
+	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" $CONF_PARAMS
+	return $?
+}
+
+# Start up the service
+launch_service "$classname"
+
+exit $?
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b6b23a8..1ee8a1d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.compaction.inner.InnerCompactionStrategy;
 import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
 import org.apache.iotdb.db.exception.LoadConfigurationException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.MetadataManagerType;
 import org.apache.iotdb.db.service.thrift.impl.InfluxDBServiceImpl;
 import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
 import org.apache.iotdb.rpc.RpcTransportFactory;
@@ -804,6 +805,8 @@ public class IoTDBConfig {
   /** Encryption provided class parameter */
   private String encryptDecryptProviderParameter;
 
+  private MetadataManagerType metadataManagerType = MetadataManagerType.MEMORY_MANAGER;
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -921,6 +924,14 @@ public class IoTDBConfig {
     this.timeIndexLevel = TimeIndexLevel.valueOf(timeIndexLevel);
   }
 
+  public void setMetadataManagerType(String type) {
+    metadataManagerType = MetadataManagerType.of(type);
+  }
+
+  public MetadataManagerType getMetadataManagerType() {
+    return metadataManagerType;
+  }
+
   void updatePath() {
     formulateFolders();
     confirmMultiDirStrategy();
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 0d5cb9e..5751a45 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1413,6 +1413,11 @@ public class IoTDBDescriptor {
             conf.getTimestampPrecision()));
   }
 
+  private void loadMetadataConfig(Properties properties) {
+    conf.setMetadataManagerType(
+        properties.getProperty("meta_data_manager", conf.getMetadataManagerType().name()));
+  }
+
   /** Get default encode algorithm by data type */
   public TSEncoding getDefaultEncodingByType(TSDataType dataType) {
     switch (dataType) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataManagerType.java b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataManagerType.java
new file mode 100644
index 0000000..402e2f4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataManagerType.java
@@ -0,0 +1,14 @@
+package org.apache.iotdb.db.metadata;
+
+public enum MetadataManagerType {
+  MEMORY_MANAGER,
+  ROCKSDB_MANAGER;
+
+  public static MetadataManagerType of(String value) {
+    try {
+      return Enum.valueOf(MetadataManagerType.class, value);
+    } catch (Exception e) {
+      return MEMORY_MANAGER;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index e408263..9ca93ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -1021,7 +1021,6 @@ public class MRocksDBManager implements IMetaManager {
                             }
                             if (RocksDBUtils.suffixMatch(iterator.key(), suffixToMatch)) {
                               if (lastIteration) {
-                                System.out.println("matched key: " + new String(iterator.key()));
                                 consumer.accept(
                                     RocksDBUtils.getPathByInnerName(new String(iterator.key())));
                               } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
index 84e02dd..c3c8906 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.metadata.logfile.MLogReader;
 import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.metadata.mtree.MTree;
@@ -51,12 +52,21 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class MetaDataTransfer {
 
   private static final Logger logger = LoggerFactory.getLogger(MetaDataTransfer.class);
 
+  private static int DEFAULT_TRANSFER_THREAD_POOL_SIZE = 200;
+  private static int DEFAULT_TRANSFER_PLANS_BUFFER_SIZE = 100_000;
+
+  private ForkJoinPool forkJoinPool = new ForkJoinPool(DEFAULT_TRANSFER_THREAD_POOL_SIZE);
+
   private String mtreeSnapshotPath;
   private MRocksDBManager rocksDBManager;
   private MLogWriter mLogWriter;
@@ -77,12 +87,12 @@ public class MetaDataTransfer {
     try {
       MetaDataTransfer transfer = new MetaDataTransfer();
       transfer.doTransfer();
-    } catch (MetadataException | IOException e) {
+    } catch (MetadataException | IOException | ExecutionException | InterruptedException e) {
       e.printStackTrace();
     }
   }
 
-  public void doTransfer() throws IOException {
+  public void doTransfer() throws IOException, ExecutionException, InterruptedException {
     File failedFile = new File(failedMLogPath);
     if (failedFile.exists()) {
       failedFile.delete();
@@ -103,36 +113,34 @@ public class MetaDataTransfer {
 
     mtreeSnapshotPath = schemaDir + File.separator + MetadataConstant.MTREE_SNAPSHOT;
     File mtreeSnapshot = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
-    long time = System.currentTimeMillis();
     if (mtreeSnapshot.exists()) {
-      transferFromSnapshot(mtreeSnapshot);
-      logger.info("spend {} ms to transfer data from snapshot", System.currentTimeMillis() - time);
+      try {
+        doTransferFromSnapshot();
+      } catch (MetadataException e) {
+        logger.error("Fatal error, terminate data transfer!!!", e);
+      }
     }
 
-    time = System.currentTimeMillis();
     String logFilePath = schemaDir + File.separator + MetadataConstant.METADATA_LOG;
     File logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
     // init the metadata from the operation log
     if (logFile.exists()) {
       try (MLogReader mLogReader = new MLogReader(schemaDir, MetadataConstant.METADATA_LOG); ) {
         transferFromMLog(mLogReader);
-        logger.info(
-            "spend {} ms to deserialize mtree from mlog.bin", System.currentTimeMillis() - time);
       } catch (Exception e) {
         throw new IOException("Failed to parser mlog.bin for err:" + e);
       }
     } else {
-      logger.info("no mlog.bin file find, skip transfer");
+      logger.info("No mlog.bin file find, skip data transfer");
     }
-
     mLogWriter.close();
 
-    logger.info(
-        "do transfer complete with {} plan failed. Failed plan are persisted in mlog.bin.transfer_failed",
-        failedPlanCount.get());
+    logger.info("Transfer metadata from MManager to MRocksDBManager complete!");
   }
 
-  private void transferFromMLog(MLogReader mLogReader) {
+  private void transferFromMLog(MLogReader mLogReader)
+      throws IOException, MetadataException, ExecutionException, InterruptedException {
+    long time = System.currentTimeMillis();
     int idx = 0;
     PhysicalPlan plan;
     List<PhysicalPlan> nonCollisionCollections = new ArrayList<>();
@@ -142,90 +150,107 @@ public class MetaDataTransfer {
         idx++;
       } catch (Exception e) {
         logger.error("Parse mlog error at lineNumber {} because:", idx, e);
-        break;
+        throw e;
       }
       if (plan == null) {
         continue;
       }
-      try {
-        switch (plan.getOperatorType()) {
-          case CREATE_TIMESERIES:
-          case CREATE_ALIGNED_TIMESERIES:
-          case AUTO_CREATE_DEVICE_MNODE:
-            nonCollisionCollections.add(plan);
-            if (nonCollisionCollections.size() > 100000) {
-              executeOperation(nonCollisionCollections, true);
-            }
-            break;
-          case DELETE_TIMESERIES:
-          case SET_STORAGE_GROUP:
-          case DELETE_STORAGE_GROUP:
-          case TTL:
-          case CHANGE_ALIAS:
-            executeOperation(nonCollisionCollections, true);
+
+      switch (plan.getOperatorType()) {
+        case CREATE_TIMESERIES:
+        case CREATE_ALIGNED_TIMESERIES:
+        case AUTO_CREATE_DEVICE_MNODE:
+          nonCollisionCollections.add(plan);
+          if (nonCollisionCollections.size() > 100000) {
+            executeBufferedOperation(nonCollisionCollections);
+          }
+          break;
+        case SET_STORAGE_GROUP:
+        case DELETE_TIMESERIES:
+        case DELETE_STORAGE_GROUP:
+        case TTL:
+        case CHANGE_ALIAS:
+          executeBufferedOperation(nonCollisionCollections);
+          try {
             rocksDBManager.operation(plan);
-            break;
-          case CHANGE_TAG_OFFSET:
-          case CREATE_TEMPLATE:
-          case DROP_TEMPLATE:
-          case APPEND_TEMPLATE:
-          case PRUNE_TEMPLATE:
-          case SET_TEMPLATE:
-          case ACTIVATE_TEMPLATE:
-          case UNSET_TEMPLATE:
-          case CREATE_CONTINUOUS_QUERY:
-          case DROP_CONTINUOUS_QUERY:
-            logger.error("unsupported operations {}", plan.toString());
-            break;
-          default:
-            logger.error("Unrecognizable command {}", plan.getOperatorType());
-        }
-      } catch (MetadataException | IOException e) {
-        logger.error("Can not operate cmd {} for err:", plan.getOperatorType(), e);
-        if (!(e instanceof StorageGroupAlreadySetException)
-            && !(e instanceof PathAlreadyExistException)
-            && !(e instanceof AliasAlreadyExistException)) {
-          persistFailedLog(plan);
-        }
+          } catch (IOException e) {
+            rocksDBManager.operation(plan);
+          } catch (MetadataException e) {
+            logger.error("Can not operate cmd {} for err:", plan.getOperatorType(), e);
+          }
+          break;
+        case CHANGE_TAG_OFFSET:
+        case CREATE_TEMPLATE:
+        case DROP_TEMPLATE:
+        case APPEND_TEMPLATE:
+        case PRUNE_TEMPLATE:
+        case SET_TEMPLATE:
+        case ACTIVATE_TEMPLATE:
+        case UNSET_TEMPLATE:
+        case CREATE_CONTINUOUS_QUERY:
+        case DROP_CONTINUOUS_QUERY:
+          logger.error("unsupported operations {}", plan.toString());
+          break;
+        default:
+          logger.error("Unrecognizable command {}", plan.getOperatorType());
       }
     }
-    executeOperation(nonCollisionCollections, true);
+
+    executeBufferedOperation(nonCollisionCollections);
+
     if (retryPlans.size() > 0) {
-      executeOperation(retryPlans, false);
+      for (PhysicalPlan retryPlan : retryPlans) {
+        try {
+          rocksDBManager.operation(retryPlan);
+        } catch (IOException e) {
+          persistFailedLog(retryPlan);
+        } catch (MetadataException e) {
+          logger.error("Execute plan failed: {}", retryPlan.toString(), e);
+        } catch (Exception e) {
+          persistFailedLog(retryPlan);
+        }
+      }
     }
+    logger.info(
+        "Transfer data from mlog.bin complete after {}ms with {} errors",
+        System.currentTimeMillis() - time,
+        failedPlanCount.get());
   }
 
-  private void executeOperation(List<PhysicalPlan> plans, boolean needsToRetry) {
-    plans
-        .parallelStream()
-        .forEach(
-            x -> {
-              try {
-                rocksDBManager.operation(x);
-              } catch (IOException e) {
-                logger.error("failed to operate plan: {}", x.toString(), e);
-                retryPlans.add(x);
-              } catch (MetadataException e) {
-                logger.error("failed to operate plan: {}", x.toString(), e);
-                if (e instanceof AcquireLockTimeoutException && needsToRetry) {
-                  retryPlans.add(x);
-                } else {
-                  persistFailedLog(x);
-                }
-              } catch (Exception e) {
-                if (needsToRetry) {
-                  retryPlans.add(x);
-                } else {
-                  persistFailedLog(x);
-                }
-              }
-            });
-    logger.info("parallel executed {} operations", plans.size());
+  private void executeBufferedOperation(List<PhysicalPlan> plans)
+      throws ExecutionException, InterruptedException {
+    if (plans.size() <= 0) {
+      return;
+    }
+    forkJoinPool
+        .submit(
+            () -> {
+              plans
+                  .parallelStream()
+                  .forEach(
+                      x -> {
+                        try {
+                          rocksDBManager.operation(x);
+                        } catch (IOException e) {
+                          retryPlans.add(x);
+                        } catch (MetadataException e) {
+                          if (e instanceof AcquireLockTimeoutException) {
+                            retryPlans.add(x);
+                          } else {
+                            logger.error("Execute plan failed: {}", x.toString(), e);
+                          }
+                        } catch (Exception e) {
+                          retryPlans.add(x);
+                        }
+                      });
+            })
+        .get();
+    logger.debug("parallel executed {} operations", plans.size());
     plans.clear();
   }
 
   private void persistFailedLog(PhysicalPlan plan) {
-    logger.info("persist won't retry and failed plan: {}", plan.toString());
+    logger.info("persist failed plan: {}", plan.toString());
     failedPlanCount.incrementAndGet();
     try {
       switch (plan.getOperatorType()) {
@@ -275,20 +300,15 @@ public class MetaDataTransfer {
       }
     } catch (IOException e) {
       logger.error(
-          "fatal error, exception when persist failed plan, metadata transfer should be failed", e);
-    }
-  }
-
-  public void transferFromSnapshot(File mtreeSnapshot) {
-    try (MLogReader mLogReader = new MLogReader(mtreeSnapshot)) {
-      doTransferFromSnapshot(mLogReader);
-    } catch (IOException | MetadataException e) {
-      logger.warn("Failed to deserialize from {}. Use a new MTree.", mtreeSnapshot.getPath());
+          "Fatal error, exception when persist failed plan, metadata transfer should be failed", e);
+      throw new RuntimeException("Terminate transfer as persist log fail.");
     }
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  private void doTransferFromSnapshot(MLogReader mLogReader) throws IOException, MetadataException {
+  private void doTransferFromSnapshot()
+      throws IOException, MetadataException, ExecutionException, InterruptedException {
+    logger.info("Start transfer data from snapshot");
     long start = System.currentTimeMillis();
     MTree mTree = new MTree();
     mTree.init();
@@ -316,15 +336,17 @@ public class MetaDataTransfer {
             });
 
     if (errorCount.get() > 0) {
-      logger.info("Fatal error. create some storage groups fail, terminate metadata transfer");
+      logger.error("Fatal error. Create some storage groups fail, terminate metadata transfer");
       return;
     }
 
     List<IMeasurementMNode> measurementMNodes = new ArrayList<>();
 
+    IMNode root = mTree.getNodeByPath(new PartialPath(new String[] {"root"}));
+    PartialPath matchAllPath = new PartialPath(new String[] {"root", "**"});
+
     MeasurementCollector collector =
-        new MeasurementCollector(
-            mTree.getNodeByPath(new PartialPath("root")), new PartialPath("root.**"), -1, -1) {
+        new MeasurementCollector(root, matchAllPath, -1, -1) {
           @Override
           protected void collectMeasurement(IMeasurementMNode node) throws MetadataException {
             measurementMNodes.add(node);
@@ -332,33 +354,70 @@ public class MetaDataTransfer {
         };
     collector.traverse();
 
-    measurementMNodes
-        .parallelStream()
-        .forEach(
-            mNode -> {
-              try {
-                rocksDBManager.createTimeSeries(
-                    mNode.getPartialPath(), mNode.getSchema(), mNode.getAlias(), null, null);
-              } catch (AcquireLockTimeoutException e) {
+    Queue<IMeasurementMNode> failCreatedNodes = new ConcurrentLinkedQueue<>();
+    AtomicInteger createdNodeCnt = new AtomicInteger(0);
+    AtomicInteger lastValue = new AtomicInteger(-1);
+    new Thread(
+            () -> {
+              while (lastValue.get() < createdNodeCnt.get()) {
+                try {
+                  lastValue.set(createdNodeCnt.get());
+                  Thread.sleep(10 * 1000);
+                  logger.info("created count: {}", createdNodeCnt.get());
+                } catch (InterruptedException e) {
+                  logger.error("timer thread error", e);
+                }
+              }
+            })
+        .start();
+
+    forkJoinPool
+        .submit(
+            () ->
+                measurementMNodes
+                    .parallelStream()
+                    .forEach(
+                        mNode -> {
+                          try {
+                            rocksDBManager.createTimeSeries(
+                                mNode.getPartialPath(),
+                                mNode.getSchema(),
+                                mNode.getAlias(),
+                                null,
+                                null);
+                            createdNodeCnt.incrementAndGet();
+                          } catch (AcquireLockTimeoutException e) {
+                            failCreatedNodes.add(mNode);
+                          } catch (MetadataException e) {
+                            logger.error(
+                                "create timeseries {} failed",
+                                mNode.getPartialPath().getFullPath(),
+                                e);
+                            errorCount.incrementAndGet();
+                          }
+                        }))
+        .get();
+
+    if (!failCreatedNodes.isEmpty()) {
+      failCreatedNodes.stream()
+          .forEach(
+              mNode -> {
                 try {
                   rocksDBManager.createTimeSeries(
                       mNode.getPartialPath(), mNode.getSchema(), mNode.getAlias(), null, null);
-                } catch (MetadataException metadataException) {
+                  createdNodeCnt.incrementAndGet();
+                } catch (Exception e) {
                   logger.error(
                       "create timeseries {} failed in retry",
                       mNode.getPartialPath().getFullPath(),
                       e);
                   errorCount.incrementAndGet();
                 }
-              } catch (MetadataException e) {
-                logger.error(
-                    "create timeseries {} failed", mNode.getPartialPath().getFullPath(), e);
-                errorCount.incrementAndGet();
-              }
-            });
+              });
+    }
 
     logger.info(
-        "metadata snapshot transfer complete after {}ms with {} errors",
+        "Transfer data from snapshot complete after {}ms with {} errors",
         System.currentTimeMillis() - start,
         errorCount.get());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 00d2e30..cb27781 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.IMetaManager;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.MetadataManagerType;
 import org.apache.iotdb.db.metadata.rocksdb.MRocksDBManager;
 import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
 import org.apache.iotdb.db.protocol.rest.RestService;
@@ -78,7 +79,14 @@ public class IoTDB implements IoTDBMBean {
     }
 
     try {
-      metaManager = new MRocksDBManager();
+      if (IoTDBDescriptor.getInstance().getConfig().getMetadataManagerType()
+          == MetadataManagerType.ROCKSDB_MANAGER) {
+        metaManager = new MRocksDBManager();
+        logger.info("Use MRocksDBManager to manage metadata");
+      } else {
+        metaManager = MManager.getInstance();
+        logger.info("Use MManager to manage metadata");
+      }
     } catch (Exception e) {
       logger.error("create meta manager fail", e);
       System.exit(1);

[iotdb] 25/45: Fix a bug where the result is invalid

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 399452f6cb7e229a5551b83e237d93c2fc891090
Author: lisijia <li...@360.cn>
AuthorDate: Thu Mar 10 16:53:54 2022 +0800

    Fix a bug where the result is invalid
---
 .../iotdb/db/metadata/rocksdb/GetBelongedToSpecifiedType.java      | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/GetBelongedToSpecifiedType.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/GetBelongedToSpecifiedType.java
index 61535ac..023aaeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/GetBelongedToSpecifiedType.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/GetBelongedToSpecifiedType.java
@@ -22,12 +22,13 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.utils.MetaUtils;
 
+import org.rocksdb.RocksDBException;
+
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
-import org.rocksdb.RocksDBException;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
@@ -76,10 +77,10 @@ public class GetBelongedToSpecifiedType {
     contextNodes.add(nodes[idx]);
     String innerName =
         RocksDBUtils.convertPartialPathToInnerByNodes(
-            contextNodes.toArray(new String[0]), contextNodes.size(), nodeType);
+            contextNodes.toArray(new String[0]), contextNodes.size() - 1, nodeType);
     byte[] queryResult = readWriteHandler.get(null, innerName.getBytes());
     if (queryResult != null) {
-      allResult.add(new PartialPath(new String(queryResult)));
+      allResult.add(new PartialPath(RocksDBUtils.concatNodesName(nodes, 0, idx)));
     }
   }
 

[iotdb] 26/45: fix a bug in wildcard processing

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9ae48a2381afb5a5e2120a9d2fa45daa252fe748
Author: lisijia <li...@360.cn>
AuthorDate: Thu Mar 10 17:51:01 2022 +0800

    fix a bug in wildcard processing
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 29 +++++++++++++++++++---
 1 file changed, 26 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index e2375c3..695e0bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -123,7 +123,18 @@ import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ALL_NODE_TYPE_ARRAY;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_NODE_VALUE;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_SCHEMA;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
 /**
@@ -873,7 +884,7 @@ public class MRocksDBManager implements IMetaManager {
                 // wait for all executing createTimeseries operations are complete
                 Thread.sleep(MAX_LOCK_WAIT_TIME * MAX_PATH_DEPTH);
                 String[] nodes = path.getNodes();
-                Arrays.asList(RockDBConstants.ALL_NODE_TYPE_ARRAY).stream()
+                Arrays.asList(ALL_NODE_TYPE_ARRAY).stream()
                     .parallel()
                     .forEach(
                         type -> {
@@ -995,6 +1006,11 @@ public class MRocksDBManager implements IMetaManager {
       Character[] nodeTypeArray)
       throws IllegalPathException {
     List<String[]> allNodesArray = RocksDBUtils.replaceMultiWildcardToSingle(nodes, maxLevel);
+    try {
+      readWriteHandler.scanAllKeys("E:\\ideaProject\\rel-360\\all.txt");
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
     allNodesArray.parallelStream().forEach(x -> traverseByPatternPath(x, function, nodeTypeArray));
   }
 
@@ -1037,6 +1053,12 @@ public class MRocksDBManager implements IMetaManager {
       int level = nextFirstWildcardIndex - 1;
 
       boolean lastIteration = nextFirstWildcardIndex >= nodes.length;
+      Character[] nodeType;
+      if (!lastIteration) {
+        nodeType = ALL_NODE_TYPE_ARRAY;
+      } else {
+        nodeType = nodeTypeArray;
+      }
 
       Queue<String[]> tempNodes = new ConcurrentLinkedQueue<>();
       byte[] suffixToMatch =
@@ -1049,7 +1071,7 @@ public class MRocksDBManager implements IMetaManager {
               prefixNodes -> {
                 String levelPrefix =
                     RocksDBUtils.getLevelPathPrefix(prefixNodes, prefixNodes.length - 1, level);
-                Arrays.stream(nodeTypeArray)
+                Arrays.stream(nodeType)
                     .parallel()
                     .forEach(
                         x -> {
@@ -1817,6 +1839,7 @@ public class MRocksDBManager implements IMetaManager {
   /** Get storage group node by path. the give path don't need to be storage group path. */
   @Override
   public IStorageGroupMNode getStorageGroupNodeByPath(PartialPath path) throws MetadataException {
+    ensureStorageGroup(path, path.getNodeLength() - 1);
     IStorageGroupMNode node = null;
     try {
       String[] nodes = path.getNodes();

[iotdb] 12/45: support showTimeseries

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bfb0124314dd6f9d596a4eaf08af94cdcabda654
Author: lisijia <li...@360.cn>
AuthorDate: Tue Mar 8 17:00:45 2022 +0800

    support showTimeseries
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 64 +++++++++++++++++++++-
 1 file changed, 63 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index a705ae6..47b8dbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -102,6 +102,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -1604,9 +1605,70 @@ public class MRocksDBManager implements IMetaManager {
   @Override
   public List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan, QueryContext context)
       throws MetadataException {
-    return null;
+    if (plan.getKey() != null && plan.getValue() != null) {
+      return showTimeseriesWithIndex(plan, context);
+    } else {
+      return showTimeseriesWithoutIndex(plan, context);
+    }
   }
 
+  private List<ShowTimeSeriesResult> showTimeseriesWithIndex(
+      ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
+    // temporarily unsupported
+    return Collections.emptyList();
+  }
+
+  private List<ShowTimeSeriesResult> showTimeseriesWithoutIndex(
+      ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
+    List<Pair<PartialPath, String[]>> ans;
+    if (plan.isOrderByHeat()) {
+      ans = Collections.emptyList();
+    } else {
+      ans = getAllMeasurementSchema(plan);
+    }
+    List<ShowTimeSeriesResult> res = new LinkedList<>();
+    for (Pair<PartialPath, String[]> ansString : ans) {
+      Pair<Map<String, String>, Map<String, String>> tagAndAttributePair =
+          new Pair<>(Collections.emptyMap(), Collections.emptyMap());
+      res.add(
+          new ShowTimeSeriesResult(
+              ansString.left.getFullPath(),
+              ansString.right[0],
+              ansString.right[1],
+              TSDataType.valueOf(ansString.right[2]),
+              TSEncoding.valueOf(ansString.right[3]),
+              CompressionType.valueOf(ansString.right[4]),
+              ansString.right[6] != null ? Long.parseLong(ansString.right[6]) : 0,
+              tagAndAttributePair.left,
+              tagAndAttributePair.right));
+    }
+    return res;
+  }
+
+  private List<Pair<PartialPath, String[]>> getAllMeasurementSchema(ShowTimeSeriesPlan plan)
+      throws MetadataException {
+    List<MeasurementPath> measurementPaths = getMatchedMeasurementPath(plan.getPath().getNodes());
+    List<Pair<PartialPath, String[]>> result = Collections.synchronizedList(new LinkedList<>());
+    measurementPaths
+        .parallelStream()
+        .forEach(
+            measurementPath -> {
+              String[] tsRow = new String[7];
+              // todo need update these properties
+              tsRow[0] = measurementPath.getMeasurementAlias();
+              // sg name
+              tsRow[1] = measurementPath.getFullPath();
+              tsRow[2] = measurementPath.getMeasurementSchema().getType().toString();
+              tsRow[3] = measurementPath.getMeasurementSchema().getEncodingType().toString();
+              tsRow[4] = measurementPath.getMeasurementSchema().getCompressor().toString();
+              tsRow[5] = String.valueOf(0);
+              tsRow[6] = null;
+              Pair<PartialPath, String[]> temp = new Pair<>(measurementPath, tsRow);
+              result.add(temp);
+            });
+
+    return result;
+  }
   /**
    * Get series type for given seriesPath.
    *

[iotdb] 05/45: fix a bug where the number of nodes is incorrect

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 68f9c0d5d2d6cf8a6b2b1d59d88075fe9577383a
Author: lisijia <li...@360.cn>
AuthorDate: Fri Mar 4 11:33:28 2022 +0800

    fix a bug where the number of nodes is incorrect
---
 .../java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java | 3 ++-
 .../java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java    | 7 ++++---
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 603c07f..0896d34 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -958,7 +958,8 @@ public class MRocksDBManager implements IMetaManager {
   }
 
   public void traverseOutcomeBasins(
-      String[] nodes, int maxLevel, Consumer<String> consumer, Character[] nodeTypeArray) {
+      String[] nodes, int maxLevel, Consumer<String> consumer, Character[] nodeTypeArray)
+      throws IllegalPathException {
     List<String[]> allNodesArray = RocksDBUtils.replaceMultiWildcardToSingle(nodes, maxLevel);
     allNodesArray.parallelStream().forEach(x -> traverseByPatternPath(x, consumer, nodeTypeArray));
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index 1e9e365..c4f4e17 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -510,7 +510,8 @@ public class RocksDBUtils {
   }
 
   // eg. root.a.*.**.b.**.c
-  public static List<String[]> replaceMultiWildcardToSingle(String[] nodes, int maxLevel) {
+  public static List<String[]> replaceMultiWildcardToSingle(String[] nodes, int maxLevel)
+      throws IllegalPathException {
     List<String[]> allNodesArray = new ArrayList<>();
     List<Integer> multiWildcardPosition = new ArrayList<>();
     for (int i = 0; i < nodes.length; i++) {
@@ -524,7 +525,7 @@ public class RocksDBUtils {
       for (int i = 1; i <= maxLevel - nodes.length + 2; i++) {
         String[] clone = nodes.clone();
         clone[multiWildcardPosition.get(0)] = replaceWildcard(i);
-        allNodesArray.add(clone);
+        allNodesArray.add(RocksDBUtils.newStringArray(clone));
       }
     } else {
       for (int sum = multiWildcardPosition.size();
@@ -536,7 +537,7 @@ public class RocksDBUtils {
           for (int i = 0; i < value.length; i++) {
             clone[multiWildcardPosition.get(i)] = replaceWildcard(value[i]);
           }
-          allNodesArray.add(clone);
+          allNodesArray.add(RocksDBUtils.newStringArray(clone));
         }
       }
     }

[iotdb] 34/45: [rocksdb] fix build break

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a76b5f19fdad89e1d71a8a3353b587f90b091b63
Author: chengjianyun <ch...@360.cn>
AuthorDate: Fri Mar 11 19:15:27 2022 +0800

    [rocksdb] fix build break
---
 .../apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java   | 13 +------------
 1 file changed, 1 insertion(+), 12 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index ceb8f40..1e3d866 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -122,18 +122,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ALL_NODE_TYPE_ARRAY;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_NODE_VALUE;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_SCHEMA;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
 /**

[iotdb] 02/45: Merge branch 'rocksdb/dev' of v.src.corp.qihoo.net:xt_hadoop/iotdb into rocksdb/dev

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit abc659c8d2f15986f3463d1d7ca1620637392d23
Merge: 5d015a1 3fc653a
Author: lisijia <li...@360.cn>
AuthorDate: Wed Mar 2 10:49:44 2022 +0800

    Merge branch 'rocksdb/dev' of v.src.corp.qihoo.net:xt_hadoop/iotdb into rocksdb/dev

 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 69 +++++-----------------
 1 file changed, 15 insertions(+), 54 deletions(-)

diff --cc server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 37821ec,703fb4f..e009dfc
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@@ -109,10 -109,9 +109,11 @@@ import java.util.Stack
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.ConcurrentLinkedQueue;
  import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.locks.Lock;
 +import java.util.function.Consumer;
  import java.util.function.Function;
+ import java.util.stream.Collectors;
  
  import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
  import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;

[iotdb] 41/45: support show child

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4a39a9edcea6baa4be95e7da83405753d8311653
Author: lisijia <li...@360.cn>
AuthorDate: Wed Mar 16 16:45:24 2022 +0800

    support show child
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 27 +++++++++++++++-------
 1 file changed, 19 insertions(+), 8 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index bf6ccfa..a3109ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -1264,16 +1264,27 @@ public class MRocksDBManager implements IMetaManager {
    */
   @Override
   public Set<String> getChildNodePathInNextLevel(PartialPath pathPattern) throws MetadataException {
-    Set<String> result = new HashSet<>();
+    // todo support wildcard
+    if (pathPattern.getFullPath().contains(ONE_LEVEL_PATH_WILDCARD)) {
+      throw new MetadataException(
+          "Wildcards are not currently supported for this operation [SHOW CHILD PATHS pathPattern].");
+    }
+    Set<String> result = Collections.synchronizedSet(new HashSet<>());
     String innerNameByLevel =
         RocksDBUtils.getLevelPath(
-            pathPattern.getNodes(),
-            pathPattern.getNodeLength() - 1,
-            pathPattern.getNodeLength() + 1);
-    Set<String> allKeyByPrefix = readWriteHandler.getKeyByPrefix(innerNameByLevel);
-    for (String str : allKeyByPrefix) {
-      result.add(RocksDBUtils.getPathByInnerName(str));
-    }
+                pathPattern.getNodes(),
+                pathPattern.getNodeLength() - 1,
+                pathPattern.getNodeLength())
+            + RockDBConstants.PATH_SEPARATOR
+            + pathPattern.getNodeLength();
+    Arrays.stream(ALL_NODE_TYPE_ARRAY)
+        .parallel()
+        .forEach(
+            x -> {
+              for (String string : readWriteHandler.getKeyByPrefix(x + innerNameByLevel)) {
+                result.add(RocksDBUtils.getPathByInnerName(string));
+              }
+            });
     return result;
   }