You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/03/17 10:32:07 UTC

[iotdb] branch QueryImprove created (now cadd8512c5)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch QueryImprove
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at cadd8512c5 use partitionLatestFlushedTimeForEachDevice instead of newlyFlushedPartitionLatestFlushedTimeForEachDevice

This branch includes the following new commits:

     new 4b0c9843f2 Filter unsatisfied TimePartition while preparing resource list
     new 991124ad45 Filter unsatisfied TimePartition while preparing resource list
     new 593357aa1f finish
     new cadd8512c5 use partitionLatestFlushedTimeForEachDevice instead of newlyFlushedPartitionLatestFlushedTimeForEachDevice

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/04: Filter unsatisfied TimePartition while preparing resource list

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryImprove
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4b0c9843f2990062da101a8e1eec7979d577bb94
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Mar 17 14:28:35 2023 +0800

    Filter unsatisfied TimePartition while preparing resource list
---
 .../apache/iotdb/db/engine/storagegroup/DataRegion.java  |  5 +++--
 .../db/engine/storagegroup/HashLastFlushTimeMap.java     | 12 ++++++++++++
 .../db/engine/storagegroup/IDTableLastFlushTimeMap.java  |  6 ++++++
 .../iotdb/db/engine/storagegroup/ILastFlushTimeMap.java  |  3 +++
 .../iotdb/db/engine/storagegroup/TsFileManager.java      | 16 ++++++++++++++++
 5 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 9fe0e3346f..1864b0425f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1700,9 +1700,10 @@ public class DataRegion implements IDataRegionForQuery {
       List<PartialPath> pathList, String singleDeviceId, QueryContext context, Filter timeFilter)
       throws QueryProcessException {
     try {
+      List<Long> timePartitions = lastFlushTimeMap.getAllSatisfiedTimePartitions(singleDeviceId);
       List<TsFileResource> seqResources =
           getFileResourceListForQuery(
-              tsFileManager.getTsFileList(true),
+              tsFileManager.getTsFileList(timePartitions, true),
               upgradeSeqFileList,
               pathList,
               singleDeviceId,
@@ -1711,7 +1712,7 @@ public class DataRegion implements IDataRegionForQuery {
               true);
       List<TsFileResource> unseqResources =
           getFileResourceListForQuery(
-              tsFileManager.getTsFileList(false),
+              tsFileManager.getTsFileList(timePartitions, false),
               upgradeUnseqFileList,
               pathList,
               singleDeviceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
index 50d524a3e5..d34a52cdad 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
@@ -22,9 +22,11 @@ package org.apache.iotdb.db.engine.storagegroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class HashLastFlushTimeMap implements ILastFlushTimeMap {
 
@@ -245,4 +247,14 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap {
     }
     return 0;
   }
+
+  @Override
+  public List<Long> getAllSatisfiedTimePartitions(String deviceId) {
+    return deviceId == null
+        ? new ArrayList<>(newlyFlushedPartitionLatestFlushedTimeForEachDevice.keySet())
+        : newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet().stream()
+            .filter(entry -> entry.getValue().containsKey(deviceId))
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toList());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
index 03dd87476a..3a56a2dd41 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.engine.storagegroup;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -184,4 +185,9 @@ public class IDTableLastFlushTimeMap implements ILastFlushTimeMap {
     }
     return 0;
   }
+
+  @Override
+  public List<Long> getAllSatisfiedTimePartitions(String deviceId) {
+    return new ArrayList<>(partitionSet);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
index f344b73f31..0da4674073 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.engine.storagegroup;
 
+import java.util.List;
 import java.util.Map;
 
 /** This interface manages last time and flush time for sequence and unsequence determination */
@@ -70,4 +71,6 @@ public interface ILastFlushTimeMap {
   void removePartition(long partitionId);
 
   long getMemSize(long partitionId);
+
+  List<Long> getAllSatisfiedTimePartitions(String deviceId);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index 86fa5e742e..ffd38cbde2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -85,6 +85,22 @@ public class TsFileManager {
     }
   }
 
+  public List<TsFileResource> getTsFileList(List<Long> timePartitions, boolean sequence) {
+    // the iteration of ConcurrentSkipListMap is not concurrent secure
+    // so we must add read lock here
+    readLock();
+    try {
+      List<TsFileResource> allResources = new ArrayList<>();
+      Map<Long, TsFileResourceList> chosenMap = sequence ? sequenceFiles : unsequenceFiles;
+      for (Long timePartition : timePartitions) {
+        allResources.addAll(chosenMap.get(timePartition).getArrayList());
+      }
+      return allResources;
+    } finally {
+      readUnlock();
+    }
+  }
+
   public TsFileResourceList getOrCreateSequenceListByTimePartition(long timePartition) {
     writeLock("getOrCreateSequenceListByTimePartition");
     try {


[iotdb] 02/04: Filter unsatisfied TimePartition while preparing resource list

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryImprove
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 991124ad4523a74cfadb93a9017168aa16f46d9b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Mar 17 15:20:14 2023 +0800

    Filter unsatisfied TimePartition while preparing resource list
---
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  3 +-
 .../engine/storagegroup/HashLastFlushTimeMap.java  | 35 +++++++++++++++++-----
 .../storagegroup/IDTableLastFlushTimeMap.java      |  3 +-
 .../db/engine/storagegroup/ILastFlushTimeMap.java  |  4 ++-
 .../apache/iotdb/db/utils/TimePartitionUtils.java  | 11 +++++++
 5 files changed, 45 insertions(+), 11 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 1864b0425f..cf994427db 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1700,7 +1700,8 @@ public class DataRegion implements IDataRegionForQuery {
       List<PartialPath> pathList, String singleDeviceId, QueryContext context, Filter timeFilter)
       throws QueryProcessException {
     try {
-      List<Long> timePartitions = lastFlushTimeMap.getAllSatisfiedTimePartitions(singleDeviceId);
+      List<Long> timePartitions =
+          lastFlushTimeMap.getAllSatisfiedTimePartitions(singleDeviceId, timeFilter);
       List<TsFileResource> seqResources =
           getFileResourceListForQuery(
               tsFileManager.getTsFileList(timePartitions, true),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
index d34a52cdad..5c337d4709 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
@@ -19,10 +19,12 @@
 
 package org.apache.iotdb.db.engine.storagegroup;
 
+import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -249,12 +251,29 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap {
   }
 
   @Override
-  public List<Long> getAllSatisfiedTimePartitions(String deviceId) {
-    return deviceId == null
-        ? new ArrayList<>(newlyFlushedPartitionLatestFlushedTimeForEachDevice.keySet())
-        : newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet().stream()
-            .filter(entry -> entry.getValue().containsKey(deviceId))
-            .map(Map.Entry::getKey)
-            .collect(Collectors.toList());
+  public List<Long> getAllSatisfiedTimePartitions(String deviceId, Filter timeFilter) {
+    if (deviceId == null) {
+      return newlyFlushedPartitionLatestFlushedTimeForEachDevice.keySet().stream()
+          .filter(
+              stringLongMap -> {
+                long[] startAndEndTime =
+                    TimePartitionUtils.getStartAndEndTimeForTimePartition(stringLongMap);
+                return (timeFilter == null
+                    || timeFilter.satisfy(startAndEndTime[0], startAndEndTime[1]));
+              })
+          .collect(Collectors.toList());
+    } else {
+      return newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet().stream()
+          .filter(
+              entry -> {
+                long[] startAndEndTime =
+                    TimePartitionUtils.getStartAndEndTimeForTimePartition(entry.getKey());
+                return (timeFilter == null
+                        || timeFilter.satisfy(startAndEndTime[0], startAndEndTime[1]))
+                    && entry.getValue().containsKey(deviceId);
+              })
+          .map(Map.Entry::getKey)
+          .collect(Collectors.toList());
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
index 3a56a2dd41..5915839319 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.storagegroup;
 
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -187,7 +188,7 @@ public class IDTableLastFlushTimeMap implements ILastFlushTimeMap {
   }
 
   @Override
-  public List<Long> getAllSatisfiedTimePartitions(String deviceId) {
+  public List<Long> getAllSatisfiedTimePartitions(String deviceId, Filter timeFilter) {
     return new ArrayList<>(partitionSet);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
index 0da4674073..dbd617b2e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.engine.storagegroup;
 
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
 import java.util.List;
 import java.util.Map;
 
@@ -72,5 +74,5 @@ public interface ILastFlushTimeMap {
 
   long getMemSize(long partitionId);
 
-  List<Long> getAllSatisfiedTimePartitions(String deviceId);
+  List<Long> getAllSatisfiedTimePartitions(String deviceId, Filter timeFilter);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
index 763b7e0d5a..2c21eb7d79 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
@@ -38,6 +38,17 @@ public class TimePartitionUtils {
     return timePartitionInterval;
   }
 
+  /**
+   * get start and end time(included) for one time partition
+   *
+   * @param timePartition time partition id
+   * @return long[2] {startTime, endTime(included)}
+   */
+  public static long[] getStartAndEndTimeForTimePartition(long timePartition) {
+    long startTime = timePartition * timePartitionInterval;
+    return new long[] {startTime, startTime + timePartitionInterval - 1};
+  }
+
   @TestOnly
   public static void setTimePartitionInterval(long timePartitionInterval) {
     TimePartitionUtils.timePartitionInterval = timePartitionInterval;


[iotdb] 04/04: use partitionLatestFlushedTimeForEachDevice instead of newlyFlushedPartitionLatestFlushedTimeForEachDevice

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryImprove
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cadd8512c534252f9fb97f606122cc922b5571c4
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Mar 17 18:31:49 2023 +0800

    use partitionLatestFlushedTimeForEachDevice instead of newlyFlushedPartitionLatestFlushedTimeForEachDevice
---
 .../org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
index 5c337d4709..dbcfe60f31 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
@@ -253,7 +253,7 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap {
   @Override
   public List<Long> getAllSatisfiedTimePartitions(String deviceId, Filter timeFilter) {
     if (deviceId == null) {
-      return newlyFlushedPartitionLatestFlushedTimeForEachDevice.keySet().stream()
+      return partitionLatestFlushedTimeForEachDevice.keySet().stream()
           .filter(
               stringLongMap -> {
                 long[] startAndEndTime =
@@ -263,7 +263,7 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap {
               })
           .collect(Collectors.toList());
     } else {
-      return newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet().stream()
+      return partitionLatestFlushedTimeForEachDevice.entrySet().stream()
           .filter(
               entry -> {
                 long[] startAndEndTime =


[iotdb] 03/04: finish

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryImprove
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 593357aa1f1ea6e651bce87e5c8d88466933adae
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Fri Mar 17 17:32:45 2023 +0800

    finish
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../statemachine/ConfigRegionStateMachine.java     |  3 +-
 .../manager/consensus/ConsensusManager.java        | 58 ++++++++++++++++------
 2 files changed, 45 insertions(+), 16 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index b6617fd6de..9e93061cf3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
 import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
 import org.apache.iotdb.confignode.writelog.io.SingleFileLogReader;
 import org.apache.iotdb.consensus.ConsensusFactory;
@@ -76,7 +77,7 @@ public class ConfigRegionStateMachine
   private int endIndex;
 
   private static final String CURRENT_FILE_DIR =
-      CONF.getConsensusDir() + File.separator + "simple" + File.separator + "current";
+      ConsensusManager.getConfigRegionDir() + File.separator + "current";
   private static final String PROGRESS_FILE_PATH =
       CURRENT_FILE_DIR + File.separator + "log_inprogress_";
   private static final String FILE_PATH = CURRENT_FILE_DIR + File.separator + "log_";
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index cca0c80433..16302fac47 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -47,6 +47,7 @@ import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -61,10 +62,11 @@ public class ConsensusManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusManager.class);
   private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
   private static final int SEED_CONFIG_NODE_ID = 0;
+  /** There is only one ConfigNodeGroup */
+  public static final ConsensusGroupId DEFAULT_CONSENSUS_GROUP_ID =
+      new ConfigRegionId(CONF.getConfigRegionId());;
 
   private final IManager configManager;
-
-  private ConsensusGroupId consensusGroupId;
   private IConsensus consensusImpl;
 
   public ConsensusManager(IManager configManager, ConfigRegionStateMachine stateMachine)
@@ -79,10 +81,9 @@ public class ConsensusManager {
 
   /** ConsensusLayer local implementation. */
   private void setConsensusLayer(ConfigRegionStateMachine stateMachine) throws IOException {
-    // There is only one ConfigNodeGroup
-    consensusGroupId = new ConfigRegionId(CONF.getConfigRegionId());
 
     if (SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
+      upgrade();
       consensusImpl =
           ConsensusFactory.getConsensusImpl(
                   SIMPLE_CONSENSUS,
@@ -213,6 +214,25 @@ public class ConsensusManager {
     }
   }
 
+  /**
+   * In version 1.1, we fixed a 1.0 SimpleConsensus bug that incorrectly set the consensus
+   * directory. For backward compatibility, we added this function, which we may remove in version
+   * 2.x
+   */
+  private void upgrade() {
+    File consensusDir = new File(CONF.getConsensusDir());
+    if (consensusDir.exists()) {
+      File oldWalDir = new File(consensusDir, "simple");
+      if (oldWalDir.exists()) {
+        if (!oldWalDir.renameTo(new File(getConfigRegionDir()))) {
+          LOGGER.warn(
+              "upgrade ConfigNode consensus wal dir for SimpleConsensus from version/1.0 to version/1.1 failed, "
+                  + "you maybe need to rename the simple dir to 0_0 manually.");
+        }
+      }
+    }
+  }
+
   /**
    * Create peer in new node to build consensus group.
    *
@@ -225,11 +245,11 @@ public class ConsensusManager {
     for (TConfigNodeLocation configNodeLocation : configNodeLocations) {
       peerList.add(
           new Peer(
-              consensusGroupId,
+              DEFAULT_CONSENSUS_GROUP_ID,
               configNodeLocation.getConfigNodeId(),
               configNodeLocation.getConsensusEndPoint()));
     }
-    consensusImpl.createPeer(consensusGroupId, peerList);
+    consensusImpl.createPeer(DEFAULT_CONSENSUS_GROUP_ID, peerList);
   }
 
   /**
@@ -242,9 +262,9 @@ public class ConsensusManager {
     boolean result =
         consensusImpl
             .addPeer(
-                consensusGroupId,
+                DEFAULT_CONSENSUS_GROUP_ID,
                 new Peer(
-                    consensusGroupId,
+                    DEFAULT_CONSENSUS_GROUP_ID,
                     configNodeLocation.getConfigNodeId(),
                     configNodeLocation.getConsensusEndPoint()))
             .isSuccess();
@@ -264,9 +284,9 @@ public class ConsensusManager {
   public boolean removeConfigNodePeer(TConfigNodeLocation configNodeLocation) {
     return consensusImpl
         .removePeer(
-            consensusGroupId,
+            DEFAULT_CONSENSUS_GROUP_ID,
             new Peer(
-                consensusGroupId,
+                DEFAULT_CONSENSUS_GROUP_ID,
                 configNodeLocation.getConfigNodeId(),
                 configNodeLocation.getConsensusEndPoint()))
         .isSuccess();
@@ -274,22 +294,22 @@ public class ConsensusManager {
 
   /** Transmit PhysicalPlan to confignode.consensus.statemachine */
   public ConsensusWriteResponse write(ConfigPhysicalPlan plan) {
-    return consensusImpl.write(consensusGroupId, plan);
+    return consensusImpl.write(DEFAULT_CONSENSUS_GROUP_ID, plan);
   }
 
   /** Transmit PhysicalPlan to confignode.consensus.statemachine */
   public ConsensusReadResponse read(ConfigPhysicalPlan plan) {
-    return consensusImpl.read(consensusGroupId, plan);
+    return consensusImpl.read(DEFAULT_CONSENSUS_GROUP_ID, plan);
   }
 
   public boolean isLeader() {
-    return consensusImpl.isLeader(consensusGroupId);
+    return consensusImpl.isLeader(DEFAULT_CONSENSUS_GROUP_ID);
   }
 
   /** @return ConfigNode-leader's location if leader exists, null otherwise. */
   public TConfigNodeLocation getLeader() {
     for (int retry = 0; retry < 50; retry++) {
-      Peer leaderPeer = consensusImpl.getLeader(consensusGroupId);
+      Peer leaderPeer = consensusImpl.getLeader(DEFAULT_CONSENSUS_GROUP_ID);
       if (leaderPeer != null) {
         List<TConfigNodeLocation> registeredConfigNodes =
             getNodeManager().getRegisteredConfigNodes();
@@ -338,7 +358,15 @@ public class ConsensusManager {
   }
 
   public ConsensusGroupId getConsensusGroupId() {
-    return consensusGroupId;
+    return DEFAULT_CONSENSUS_GROUP_ID;
+  }
+
+  public static String getConfigRegionDir() {
+    return CONF.getConsensusDir()
+        + File.separator
+        + ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID.getType().getValue()
+        + "_"
+        + ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID.getId();
   }
 
   public IConsensus getConsensusImpl() {