You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/03/15 01:30:39 UTC

[iotdb] branch new_mpp updated: [To_new_mpp] Basic query memory control (#5216)

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_mpp by this push:
     new 7c04879  [To_new_mpp] Basic query memory control (#5216)
7c04879 is described below

commit 7c04879b885f7931cfd4bfc569979913114a9772
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Tue Mar 15 09:22:55 2022 +0800

    [To_new_mpp] Basic query memory control (#5216)
    
    * [To_new_mpp] Basic query memory control
    
    * Add license
---
 .../iotdb/mpp/memory/LocalMemoryManager.java       |  46 +++++++
 .../org/apache/iotdb/mpp/memory/MemoryPool.java    |  90 +++++++++++++
 .../apache/iotdb/mpp/memory/MemoryPoolTest.java    | 150 +++++++++++++++++++++
 3 files changed, 286 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java
new file mode 100644
index 0000000..cc5305e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.memory;
+
+/**
+ * Manages memory of a data node. The memory is divided into two memory pools so that the memory for
+ * read and for write can be isolated.
+ */
+public class LocalMemoryManager {
+
+  private final long maxBytes;
+  private final MemoryPool queryPool;
+
+  public LocalMemoryManager() {
+    long maxMemory = Runtime.getRuntime().maxMemory();
+    // Save 20% memory for untracked allocations.
+    maxBytes = (long) (maxMemory * 0.8);
+    // Allocate 50% memory for query execution.
+    queryPool = new MemoryPool("query", (long) (maxBytes * 0.5));
+  }
+
+  public long getMaxBytes() {
+    return maxBytes;
+  }
+
+  public MemoryPool getQueryPool() {
+    return queryPool;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java
new file mode 100644
index 0000000..29b7228
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.memory;
+
+import org.apache.commons.lang3.Validate;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Manages certain amount of memory. */
+public class MemoryPool {
+
+  private final String id;
+  private final long maxBytes;
+
+  private long reservedBytes = 0L;
+  private final Map<String, Long> queryMemoryReservations = new HashMap<>();
+
+  public MemoryPool(String id, long maxBytes) {
+    this.id = Validate.notNull(id);
+    Validate.isTrue(maxBytes > 0L);
+    this.maxBytes = maxBytes;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public long getMaxBytes() {
+    return maxBytes;
+  }
+
+  public boolean tryReserve(String queryId, long bytes) {
+    Validate.notNull(queryId);
+    Validate.isTrue(bytes > 0L);
+
+    synchronized (this) {
+      if (maxBytes - reservedBytes < bytes) {
+        return false;
+      }
+      reservedBytes += bytes;
+      queryMemoryReservations.merge(queryId, bytes, Long::sum);
+    }
+
+    return true;
+  }
+
+  public synchronized void free(String queryId, long bytes) {
+    Validate.notNull(queryId);
+    Validate.isTrue(bytes > 0L);
+
+    Long queryReservedBytes = queryMemoryReservations.get(queryId);
+    Validate.notNull(queryReservedBytes);
+    Validate.isTrue(bytes <= queryReservedBytes);
+
+    queryReservedBytes -= bytes;
+    if (queryReservedBytes == 0) {
+      queryMemoryReservations.remove(queryId);
+    } else {
+      queryMemoryReservations.put(queryId, queryReservedBytes);
+    }
+
+    reservedBytes -= bytes;
+  }
+
+  public synchronized long getQueryMemoryReservedBytes(String queryId) {
+    return queryMemoryReservations.getOrDefault(queryId, 0L);
+  }
+
+  public long getReservedBytes() {
+    return reservedBytes;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java b/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java
new file mode 100644
index 0000000..cb76b93
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.memory;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MemoryPoolTest {
+
+  MemoryPool pool;
+
+  @Before
+  public void before() {
+    pool = new MemoryPool("test", 1024L);
+  }
+
+  @Test
+  public void testReserve() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+  }
+
+  @Test
+  public void testReserveZero() {
+    String queryId = "q0";
+    try {
+      pool.tryReserve(queryId, 0L);
+      Assert.fail("Expect IllegalArgumentException");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
+
+  @Test
+  public void testReserveNegative() {
+    String queryId = "q0";
+    try {
+      pool.tryReserve(queryId, -1L);
+      Assert.fail("Expect IllegalArgumentException");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
+
+  @Test
+  public void testReserveAll() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 1024L));
+    Assert.assertEquals(1024L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(1024L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+  }
+
+  @Test
+  public void testOverReserve() {
+    String queryId = "q0";
+    Assert.assertFalse(pool.tryReserve(queryId, 1025L));
+    Assert.assertEquals(0L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(0L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+  }
+
+  @Test
+  public void testFree() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    pool.free(queryId, 256L);
+    Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(256L, pool.getReservedBytes());
+  }
+
+  @Test
+  public void testFreeAll() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    pool.free(queryId, 512L);
+    Assert.assertEquals(0L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(0L, pool.getReservedBytes());
+  }
+
+  @Test
+  public void testFreeZero() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    pool.free(queryId, 256L);
+    Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(256L, pool.getReservedBytes());
+  }
+
+  @Test
+  public void testFreeNegative() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    try {
+      pool.free(queryId, -1L);
+      Assert.fail("Expect IllegalArgumentException");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
+
+  @Test
+  public void testOverFree() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    try {
+      pool.free(queryId, 513L);
+      Assert.fail("Expect IllegalArgumentException");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
+}