You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/03/21 02:33:28 UTC
[iotdb] 02/07: [To_new_mpp] Basic query memory control (#5216)
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 199930b2b121286712e82753198847bac58ae5fc
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Tue Mar 15 09:22:55 2022 +0800
[To_new_mpp] Basic query memory control (#5216)
* [To_new_mpp] Basic query memory control
* Add license
---
.../iotdb/mpp/memory/LocalMemoryManager.java | 46 +++++++
.../org/apache/iotdb/mpp/memory/MemoryPool.java | 90 +++++++++++++
.../apache/iotdb/mpp/memory/MemoryPoolTest.java | 150 +++++++++++++++++++++
3 files changed, 286 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java
new file mode 100644
index 0000000..cc5305e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.memory;
+
+/**
+ * Manages memory of a data node. The memory is divided into two memory pools so that the memory for
+ * read and for write can be isolated.
+ */
+public class LocalMemoryManager {
+
+ private final long maxBytes;
+ private final MemoryPool queryPool;
+
+ public LocalMemoryManager() {
+ long maxMemory = Runtime.getRuntime().maxMemory();
+ // Save 20% memory for untracked allocations.
+ maxBytes = (long) (maxMemory * 0.8);
+ // Allocate 50% memory for query execution.
+ queryPool = new MemoryPool("query", (long) (maxBytes * 0.5));
+ }
+
+ public long getMaxBytes() {
+ return maxBytes;
+ }
+
+ public MemoryPool getQueryPool() {
+ return queryPool;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java
new file mode 100644
index 0000000..29b7228
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.memory;
+
+import org.apache.commons.lang3.Validate;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Manages certain amount of memory. */
+public class MemoryPool {
+
+ private final String id;
+ private final long maxBytes;
+
+ private long reservedBytes = 0L;
+ private final Map<String, Long> queryMemoryReservations = new HashMap<>();
+
+ public MemoryPool(String id, long maxBytes) {
+ this.id = Validate.notNull(id);
+ Validate.isTrue(maxBytes > 0L);
+ this.maxBytes = maxBytes;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public long getMaxBytes() {
+ return maxBytes;
+ }
+
+ public boolean tryReserve(String queryId, long bytes) {
+ Validate.notNull(queryId);
+ Validate.isTrue(bytes > 0L);
+
+ synchronized (this) {
+ if (maxBytes - reservedBytes < bytes) {
+ return false;
+ }
+ reservedBytes += bytes;
+ queryMemoryReservations.merge(queryId, bytes, Long::sum);
+ }
+
+ return true;
+ }
+
+ public synchronized void free(String queryId, long bytes) {
+ Validate.notNull(queryId);
+ Validate.isTrue(bytes > 0L);
+
+ Long queryReservedBytes = queryMemoryReservations.get(queryId);
+ Validate.notNull(queryReservedBytes);
+ Validate.isTrue(bytes <= queryReservedBytes);
+
+ queryReservedBytes -= bytes;
+ if (queryReservedBytes == 0) {
+ queryMemoryReservations.remove(queryId);
+ } else {
+ queryMemoryReservations.put(queryId, queryReservedBytes);
+ }
+
+ reservedBytes -= bytes;
+ }
+
+ public synchronized long getQueryMemoryReservedBytes(String queryId) {
+ return queryMemoryReservations.getOrDefault(queryId, 0L);
+ }
+
+ public long getReservedBytes() {
+ return reservedBytes;
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java b/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java
new file mode 100644
index 0000000..cb76b93
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.memory;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MemoryPoolTest {
+
+ MemoryPool pool;
+
+ @Before
+ public void before() {
+ pool = new MemoryPool("test", 1024L);
+ }
+
+ @Test
+ public void testReserve() {
+ String queryId = "q0";
+ Assert.assertTrue(pool.tryReserve(queryId, 512L));
+ Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+ Assert.assertEquals(512L, pool.getReservedBytes());
+ Assert.assertEquals(1024L, pool.getMaxBytes());
+ }
+
+ @Test
+ public void testReserveZero() {
+ String queryId = "q0";
+ try {
+ pool.tryReserve(queryId, 0L);
+ Assert.fail("Expect IllegalArgumentException");
+ } catch (IllegalArgumentException ignore) {
+ }
+ }
+
+ @Test
+ public void testReserveNegative() {
+ String queryId = "q0";
+ try {
+ pool.tryReserve(queryId, -1L);
+ Assert.fail("Expect IllegalArgumentException");
+ } catch (IllegalArgumentException ignore) {
+ }
+ }
+
+ @Test
+ public void testReserveAll() {
+ String queryId = "q0";
+ Assert.assertTrue(pool.tryReserve(queryId, 1024L));
+ Assert.assertEquals(1024L, pool.getQueryMemoryReservedBytes(queryId));
+ Assert.assertEquals(1024L, pool.getReservedBytes());
+ Assert.assertEquals(1024L, pool.getMaxBytes());
+ }
+
+ @Test
+ public void testOverReserve() {
+ String queryId = "q0";
+ Assert.assertFalse(pool.tryReserve(queryId, 1025L));
+ Assert.assertEquals(0L, pool.getQueryMemoryReservedBytes(queryId));
+ Assert.assertEquals(0L, pool.getReservedBytes());
+ Assert.assertEquals(1024L, pool.getMaxBytes());
+ }
+
+ @Test
+ public void testFree() {
+ String queryId = "q0";
+ Assert.assertTrue(pool.tryReserve(queryId, 512L));
+ Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+ Assert.assertEquals(512L, pool.getReservedBytes());
+ Assert.assertEquals(1024L, pool.getMaxBytes());
+
+ pool.free(queryId, 256L);
+ Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(queryId));
+ Assert.assertEquals(256L, pool.getReservedBytes());
+ }
+
+ @Test
+ public void testFreeAll() {
+ String queryId = "q0";
+ Assert.assertTrue(pool.tryReserve(queryId, 512L));
+ Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+ Assert.assertEquals(512L, pool.getReservedBytes());
+ Assert.assertEquals(1024L, pool.getMaxBytes());
+
+ pool.free(queryId, 512L);
+ Assert.assertEquals(0L, pool.getQueryMemoryReservedBytes(queryId));
+ Assert.assertEquals(0L, pool.getReservedBytes());
+ }
+
+ @Test
+ public void testFreeZero() {
+ String queryId = "q0";
+ Assert.assertTrue(pool.tryReserve(queryId, 512L));
+ Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+ Assert.assertEquals(512L, pool.getReservedBytes());
+ Assert.assertEquals(1024L, pool.getMaxBytes());
+
+ pool.free(queryId, 256L);
+ Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(queryId));
+ Assert.assertEquals(256L, pool.getReservedBytes());
+ }
+
+ @Test
+ public void testFreeNegative() {
+ String queryId = "q0";
+ Assert.assertTrue(pool.tryReserve(queryId, 512L));
+ Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+ Assert.assertEquals(512L, pool.getReservedBytes());
+ Assert.assertEquals(1024L, pool.getMaxBytes());
+
+ try {
+ pool.free(queryId, -1L);
+ Assert.fail("Expect IllegalArgumentException");
+ } catch (IllegalArgumentException ignore) {
+ }
+ }
+
+ @Test
+ public void testOverFree() {
+ String queryId = "q0";
+ Assert.assertTrue(pool.tryReserve(queryId, 512L));
+ Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+ Assert.assertEquals(512L, pool.getReservedBytes());
+ Assert.assertEquals(1024L, pool.getMaxBytes());
+
+ try {
+ pool.free(queryId, 513L);
+ Assert.fail("Expect IllegalArgumentException");
+ } catch (IllegalArgumentException ignore) {
+ }
+ }
+}