You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2023/02/05 08:32:06 UTC
[iotdb] 10/10: fix conflict
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch geely_car_0205_confignode
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c74e3e2b18c0ce3096f241889c9e3d8e19f1bc9f
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Sun Feb 5 16:23:32 2023 +0800
fix conflict
---
.../request/write/SimpleConsensusLogWriter.java | 3 +
.../statemachine/ConfigNodeRegionStateMachine.java | 20 ++---
.../iotdb/confignode/manager/ConfigManager.java | 90 +++-------------------
3 files changed, 19 insertions(+), 94 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/SimpleConsensusLogWriter.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/SimpleConsensusLogWriter.java
new file mode 100644
index 0000000000..313f71ef92
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/SimpleConsensusLogWriter.java
@@ -0,0 +1,3 @@
+package org.apache.iotdb.confignode.consensus.request.write;
+
+public class SimpleConsensusLogWriter {}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
index 16d75de30b..72e68c1cfe 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
@@ -51,6 +51,7 @@ import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/** StateMachine for ConfigNodeRegion */
@@ -68,6 +69,7 @@ public class ConfigNodeRegionStateMachine
private File logFile;
private int startIndex;
private int endIndex;
+ private ScheduledExecutorService executorService;
private static final String CURRENT_FILE_DIR =
CONF.getConsensusDir() + File.separator + "simple" + File.separator + "current";
@@ -302,15 +304,10 @@ public class ConfigNodeRegionStateMachine
try {
ByteBuffer buffer = plan.serializeToByteBuffer();
- // The method logWriter.write will execute flip() firstly, so we must make position==limit
buffer.position(buffer.limit());
logWriter.write(buffer);
- logWriter.close();
+
endIndex = endIndex + 1;
- File tmpLogFile = new File(PROGRESS_FILE_PATH + endIndex);
- Files.move(logFile.toPath(), tmpLogFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
- logFile = tmpLogFile;
- logWriter = new LogWriter(logFile, false);
} catch (Exception e) {
LOGGER.error(
"Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode", e);
@@ -323,14 +320,6 @@ public class ConfigNodeRegionStateMachine
String[] list = new File(CURRENT_FILE_DIR).list();
if (list != null && list.length != 0) {
for (String logFileName : list) {
- int tmp = Integer.parseInt(logFileName.substring(logFileName.lastIndexOf("_") + 1));
- if (logFileName.startsWith("log_inprogress")) {
- endIndex = tmp;
- } else {
- if (startIndex < tmp) {
- startIndex = tmp;
- }
- }
File logFile =
SystemFileFactory.INSTANCE.getFile(CURRENT_FILE_DIR + File.separator + logFileName);
SingleFileLogReader logReader;
@@ -343,7 +332,10 @@ public class ConfigNodeRegionStateMachine
e);
continue;
}
+
+ startIndex = endIndex;
while (logReader.hasNext()) {
+ endIndex++;
// read and re-serialize the PhysicalPlan
ConfigPhysicalPlan nextPlan = logReader.next();
try {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 3432916141..1d4c57716a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -217,6 +217,10 @@ public class ConfigManager implements IManager {
private final RetryFailedTasksThread retryFailedTasksThread;
+ private static int schemaPartitionCount = 0;
+
+ private static int dataPartitionCount = 0;
+
public ConfigManager() throws IOException {
// Build the persistence module
NodeInfo nodeInfo = new NodeInfo();
@@ -650,40 +654,10 @@ public class ConfigManager implements IManager {
partitionManager.getOrCreateSchemaPartition(getOrCreateSchemaPartitionPlan);
resp = queryResult.convertToRpcSchemaPartitionTableResp();
- StringBuilder devicePathString = new StringBuilder("{");
- for (String devicePath : devicePaths) {
- devicePathString.append("\n\t").append(devicePath).append(",");
- }
- devicePathString.append("\n}");
-
- StringBuilder schemaPartitionRespString = new StringBuilder("{");
- schemaPartitionRespString
- .append("\n\tTSStatus=")
- .append(resp.getStatus().getCode())
- .append(",");
- Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
- resp.getSchemaPartitionTable();
- for (Map.Entry<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> databaseEntry :
- schemaPartitionTable.entrySet()) {
- String database = databaseEntry.getKey();
- schemaPartitionRespString.append("\n\tDatabase=").append(database).append(": {");
- for (Map.Entry<TSeriesPartitionSlot, TConsensusGroupId> slotEntry :
- databaseEntry.getValue().entrySet()) {
- schemaPartitionRespString
- .append("\n\t\t")
- .append(slotEntry.getKey())
- .append(", ")
- .append(slotEntry.getValue())
- .append(",");
- }
- schemaPartitionRespString.append("\n\t},");
+ schemaPartitionCount += 1;
+ if (schemaPartitionCount % 100 == 0) {
+ LOGGER.info("schemaPartition req count: {}", schemaPartitionCount);
}
- schemaPartitionRespString.append("\n}");
-
- LOGGER.info(
- "[GetOrCreateSchemaPartition]:\nReceive PathPatternTree: {}, Return TSchemaPartitionTableResp: {}",
- devicePathString,
- schemaPartitionRespString);
return resp;
}
@@ -753,54 +727,10 @@ public class ConfigManager implements IManager {
resp = queryResult.convertToTDataPartitionTableResp();
- StringBuilder partitionSlotsMapString = new StringBuilder("{");
- for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> databaseEntry :
- getOrCreateDataPartitionReq.getPartitionSlotsMap().entrySet()) {
- String database = databaseEntry.getKey();
- partitionSlotsMapString.append("\n\tDatabase=").append(database).append(": {");
- for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> slotEntry :
- databaseEntry.getValue().entrySet()) {
- partitionSlotsMapString
- .append("\n\t\t")
- .append(slotEntry.getKey())
- .append(",")
- .append(slotEntry.getValue());
- }
- partitionSlotsMapString.append("\n\t},");
+ dataPartitionCount++;
+ if (dataPartitionCount % 100 == 0) {
+ LOGGER.info("dataPartition req count: {}", dataPartitionCount);
}
- partitionSlotsMapString.append("\n}");
-
- StringBuilder dataPartitionRespString = new StringBuilder("{");
- dataPartitionRespString.append("\n\tTSStatus=").append(resp.getStatus().getCode()).append(",");
- Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
- dataPartitionTable = resp.getDataPartitionTable();
- for (Map.Entry<
- String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
- databaseEntry : dataPartitionTable.entrySet()) {
- String database = databaseEntry.getKey();
- dataPartitionRespString.append("\n\tDatabase=").append(database).append(": {");
- for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
- seriesSlotEntry : databaseEntry.getValue().entrySet()) {
- dataPartitionRespString.append("\n\t\t").append(seriesSlotEntry.getKey()).append(": {");
- for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> timeSlotEntry :
- seriesSlotEntry.getValue().entrySet()) {
- dataPartitionRespString
- .append("\n\t\t\t")
- .append(timeSlotEntry.getKey())
- .append(", ")
- .append(timeSlotEntry.getValue())
- .append(",");
- }
- dataPartitionRespString.append("\n\t\t},");
- }
- dataPartitionRespString.append("\n\t}");
- }
- dataPartitionRespString.append("\n}");
-
- LOGGER.info(
- "[GetOrCreateDataPartition]:\nReceive PartitionSlotsMap: {}, Return TDataPartitionTableResp: {}",
- partitionSlotsMapString,
- dataPartitionRespString);
return resp;
}