You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2023/02/05 08:32:06 UTC

[iotdb] 10/10: fix conflict

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch geely_car_0205_confignode
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c74e3e2b18c0ce3096f241889c9e3d8e19f1bc9f
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Sun Feb 5 16:23:32 2023 +0800

    fix conflict
---
 .../request/write/SimpleConsensusLogWriter.java    |  3 +
 .../statemachine/ConfigNodeRegionStateMachine.java | 20 ++---
 .../iotdb/confignode/manager/ConfigManager.java    | 90 +++-------------------
 3 files changed, 19 insertions(+), 94 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/SimpleConsensusLogWriter.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/SimpleConsensusLogWriter.java
new file mode 100644
index 0000000000..313f71ef92
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/SimpleConsensusLogWriter.java
@@ -0,0 +1,3 @@
+package org.apache.iotdb.confignode.consensus.request.write;
+
+public class SimpleConsensusLogWriter {}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
index 16d75de30b..72e68c1cfe 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
@@ -51,6 +51,7 @@ import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /** StateMachine for ConfigNodeRegion */
@@ -68,6 +69,7 @@ public class ConfigNodeRegionStateMachine
   private File logFile;
   private int startIndex;
   private int endIndex;
+  private ScheduledExecutorService executorService;
 
   private static final String CURRENT_FILE_DIR =
       CONF.getConsensusDir() + File.separator + "simple" + File.separator + "current";
@@ -302,15 +304,10 @@ public class ConfigNodeRegionStateMachine
 
     try {
       ByteBuffer buffer = plan.serializeToByteBuffer();
-      // The method logWriter.write will execute flip() firstly, so we must make position==limit
       buffer.position(buffer.limit());
       logWriter.write(buffer);
-      logWriter.close();
+
       endIndex = endIndex + 1;
-      File tmpLogFile = new File(PROGRESS_FILE_PATH + endIndex);
-      Files.move(logFile.toPath(), tmpLogFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
-      logFile = tmpLogFile;
-      logWriter = new LogWriter(logFile, false);
     } catch (Exception e) {
       LOGGER.error(
           "Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode", e);
@@ -323,14 +320,6 @@ public class ConfigNodeRegionStateMachine
     String[] list = new File(CURRENT_FILE_DIR).list();
     if (list != null && list.length != 0) {
       for (String logFileName : list) {
-        int tmp = Integer.parseInt(logFileName.substring(logFileName.lastIndexOf("_") + 1));
-        if (logFileName.startsWith("log_inprogress")) {
-          endIndex = tmp;
-        } else {
-          if (startIndex < tmp) {
-            startIndex = tmp;
-          }
-        }
         File logFile =
             SystemFileFactory.INSTANCE.getFile(CURRENT_FILE_DIR + File.separator + logFileName);
         SingleFileLogReader logReader;
@@ -343,7 +332,10 @@ public class ConfigNodeRegionStateMachine
               e);
           continue;
         }
+
+        startIndex = endIndex;
         while (logReader.hasNext()) {
+          endIndex++;
           // read and re-serialize the PhysicalPlan
           ConfigPhysicalPlan nextPlan = logReader.next();
           try {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 3432916141..1d4c57716a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -217,6 +217,10 @@ public class ConfigManager implements IManager {
 
   private final RetryFailedTasksThread retryFailedTasksThread;
 
+  private static int schemaPartitionCount = 0;
+
+  private static int dataPartitionCount = 0;
+
   public ConfigManager() throws IOException {
     // Build the persistence module
     NodeInfo nodeInfo = new NodeInfo();
@@ -650,40 +654,10 @@ public class ConfigManager implements IManager {
         partitionManager.getOrCreateSchemaPartition(getOrCreateSchemaPartitionPlan);
     resp = queryResult.convertToRpcSchemaPartitionTableResp();
 
-    StringBuilder devicePathString = new StringBuilder("{");
-    for (String devicePath : devicePaths) {
-      devicePathString.append("\n\t").append(devicePath).append(",");
-    }
-    devicePathString.append("\n}");
-
-    StringBuilder schemaPartitionRespString = new StringBuilder("{");
-    schemaPartitionRespString
-        .append("\n\tTSStatus=")
-        .append(resp.getStatus().getCode())
-        .append(",");
-    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
-        resp.getSchemaPartitionTable();
-    for (Map.Entry<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> databaseEntry :
-        schemaPartitionTable.entrySet()) {
-      String database = databaseEntry.getKey();
-      schemaPartitionRespString.append("\n\tDatabase=").append(database).append(": {");
-      for (Map.Entry<TSeriesPartitionSlot, TConsensusGroupId> slotEntry :
-          databaseEntry.getValue().entrySet()) {
-        schemaPartitionRespString
-            .append("\n\t\t")
-            .append(slotEntry.getKey())
-            .append(", ")
-            .append(slotEntry.getValue())
-            .append(",");
-      }
-      schemaPartitionRespString.append("\n\t},");
+    schemaPartitionCount += 1;
+    if (schemaPartitionCount % 100 == 0) {
+      LOGGER.info("schemaPartition req count: {}", schemaPartitionCount);
     }
-    schemaPartitionRespString.append("\n}");
-
-    LOGGER.info(
-        "[GetOrCreateSchemaPartition]:\nReceive PathPatternTree: {}, Return TSchemaPartitionTableResp: {}",
-        devicePathString,
-        schemaPartitionRespString);
 
     return resp;
   }
@@ -753,54 +727,10 @@ public class ConfigManager implements IManager {
 
     resp = queryResult.convertToTDataPartitionTableResp();
 
-    StringBuilder partitionSlotsMapString = new StringBuilder("{");
-    for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> databaseEntry :
-        getOrCreateDataPartitionReq.getPartitionSlotsMap().entrySet()) {
-      String database = databaseEntry.getKey();
-      partitionSlotsMapString.append("\n\tDatabase=").append(database).append(": {");
-      for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> slotEntry :
-          databaseEntry.getValue().entrySet()) {
-        partitionSlotsMapString
-            .append("\n\t\t")
-            .append(slotEntry.getKey())
-            .append(",")
-            .append(slotEntry.getValue());
-      }
-      partitionSlotsMapString.append("\n\t},");
+    dataPartitionCount++;
+    if (dataPartitionCount % 100 == 0) {
+      LOGGER.info("dataPartition req count: {}", dataPartitionCount);
     }
-    partitionSlotsMapString.append("\n}");
-
-    StringBuilder dataPartitionRespString = new StringBuilder("{");
-    dataPartitionRespString.append("\n\tTSStatus=").append(resp.getStatus().getCode()).append(",");
-    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
-        dataPartitionTable = resp.getDataPartitionTable();
-    for (Map.Entry<
-            String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
-        databaseEntry : dataPartitionTable.entrySet()) {
-      String database = databaseEntry.getKey();
-      dataPartitionRespString.append("\n\tDatabase=").append(database).append(": {");
-      for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
-          seriesSlotEntry : databaseEntry.getValue().entrySet()) {
-        dataPartitionRespString.append("\n\t\t").append(seriesSlotEntry.getKey()).append(": {");
-        for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> timeSlotEntry :
-            seriesSlotEntry.getValue().entrySet()) {
-          dataPartitionRespString
-              .append("\n\t\t\t")
-              .append(timeSlotEntry.getKey())
-              .append(", ")
-              .append(timeSlotEntry.getValue())
-              .append(",");
-        }
-        dataPartitionRespString.append("\n\t\t},");
-      }
-      dataPartitionRespString.append("\n\t}");
-    }
-    dataPartitionRespString.append("\n}");
-
-    LOGGER.info(
-        "[GetOrCreateDataPartition]:\nReceive PartitionSlotsMap: {}, Return TDataPartitionTableResp: {}",
-        partitionSlotsMapString,
-        dataPartitionRespString);
 
     return resp;
   }