You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/03/07 00:35:10 UTC
[iotdb] 03/03: [To rel/1.1][IOTDB-5620] Fix flush stuck when there is a lot of time partitions in each DataRegion (#9225)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rc/1.1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 32f9db010848cbbc251b1fecbc68f4b3e5404420
Author: Haonan <hh...@outlook.com>
AuthorDate: Mon Mar 6 20:42:15 2023 +0800
[To rel/1.1][IOTDB-5620] Fix flush stuck when there is a lot of time partitions in each DataRegion (#9225)
---
.../iotdb/it/env/cluster/MppCommonConfig.java | 6 ++
.../it/env/cluster/MppSharedCommonConfig.java | 7 +++
.../iotdb/it/env/remote/RemoteCommonConfig.java | 5 ++
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
.../iotdb/db/it/IoTDBInsertMultiPartitionIT.java | 71 ++++++++++++++++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 1 -
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 2 +-
.../engine/storagegroup/TimePartitionManager.java | 9 ++-
8 files changed, 100 insertions(+), 3 deletions(-)
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
index c1bcbde4ec..fe9f3a6767 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
@@ -334,4 +334,10 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig {
setProperty("schema_memory_allocate_proportion", String.valueOf(schemaMemoryAllocate));
return this;
}
+
+ @Override
+ public CommonConfig setWriteMemoryProportion(String writeMemoryProportion) {
+ setProperty("write_memory_proportion", writeMemoryProportion);
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
index ddaba5a331..275ed1f8f8 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
@@ -343,4 +343,11 @@ public class MppSharedCommonConfig implements CommonConfig {
cnConfig.setSchemaMemoryAllocate(schemaMemoryAllocate);
return this;
}
+
+ @Override
+ public CommonConfig setWriteMemoryProportion(String writeMemoryProportion) {
+ dnConfig.setWriteMemoryProportion(writeMemoryProportion);
+ cnConfig.setWriteMemoryProportion(writeMemoryProportion);
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
index 440b38253e..526df5dbab 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
@@ -246,4 +246,9 @@ public class RemoteCommonConfig implements CommonConfig {
public CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate) {
return this;
}
+
+ @Override
+ public CommonConfig setWriteMemoryProportion(String writeMemoryProportion) {
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 59dcd778ab..4792b8b3e7 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -110,4 +110,6 @@ public interface CommonConfig {
CommonConfig setSeriesSlotNum(int seriesSlotNum);
CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate);
+
+ CommonConfig setWriteMemoryProportion(String writeMemoryProportion);
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBInsertMultiPartitionIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBInsertMultiPartitionIT.java
new file mode 100644
index 0000000000..fac1175f74
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBInsertMultiPartitionIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.itbase.category.RemoteIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class, RemoteIT.class})
+public class IoTDBInsertMultiPartitionIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getDataNodeCommonConfig()
+ .setWriteMemoryProportion("10000000:1");
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testInsertMultiPartition() {
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("insert into root.sg.d1(time,s1) values(1,2)");
+ statement.execute("flush");
+ statement.execute("insert into root.sg.d1(time,s1) values(2,2)");
+ statement.execute("insert into root.sg.d1(time,s1) values(604800001,2)");
+ statement.execute("flush");
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 75a461b006..70a0b0f3da 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1785,7 +1785,6 @@ public class IoTDBConfig {
public void setAllocateMemoryForStorageEngine(long allocateMemoryForStorageEngine) {
this.allocateMemoryForStorageEngine = allocateMemoryForStorageEngine;
- this.allocateMemoryForTimePartitionInfo = allocateMemoryForStorageEngine * 50 / 1001;
}
public long getAllocateMemoryForSchema() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index a4c40333ab..0432754ab7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1612,7 +1612,7 @@ public class IoTDBDescriptor {
double memtableProportionForWrite =
((double) (proportionForMemTable)
/ (double) (proportionForMemTable + proportionForTimePartitionInfo));
- Double.parseDouble(properties.getProperty("flush_time_memory_proportion", "0.05"));
+
double timePartitionInfoForWrite =
((double) (proportionForTimePartitionInfo)
/ (double) (proportionForMemTable + proportionForTimePartitionInfo));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java
index 1f77b36b82..640f8dfb95 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java
@@ -24,6 +24,9 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
@@ -31,6 +34,7 @@ import java.util.TreeSet;
/** Manage all the time partitions for all data regions and control the total memory of them */
public class TimePartitionManager {
+ private static final Logger logger = LoggerFactory.getLogger(TimePartitionManager.class);
final Map<DataRegionId, Map<Long, TimePartitionInfo>> timePartitionInfoMap;
long memCost = 0;
@@ -102,7 +106,10 @@ public class TimePartitionManager {
}
while (memCost > timePartitionInfoMemoryThreshold) {
- TimePartitionInfo timePartitionInfo = treeSet.first();
+ TimePartitionInfo timePartitionInfo = treeSet.pollFirst();
+ if (timePartitionInfo == null) {
+ return;
+ }
memCost -= timePartitionInfo.memSize;
DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion(timePartitionInfo.dataRegionId);