You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2021/01/18 15:14:14 UTC

[ignite] branch master updated: IGNITE-14003 Improve heap consumption on reserving rebalance iterator. (#8671)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f4bf825  IGNITE-14003 Improve heap consumption on reserving rebalance iterator. (#8671)
f4bf825 is described below

commit f4bf8253f89ddccd36ee8e03d3e51ddc4bc039b4
Author: Ivan Daschinskiy <iv...@gmail.com>
AuthorDate: Mon Jan 18 18:13:40 2021 +0300

    IGNITE-14003 Improve heap consumption on reserving rebalance iterator. (#8671)
---
 .../cache/IgniteCacheOffheapManagerImpl.java       |  19 ++-
 .../RebalanceIteratorLargeEntriesOOMTest.java      | 137 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite2.java   |   3 +
 3 files changed, 155 insertions(+), 4 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 773297f..267430a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1140,14 +1140,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             return null;
         }
 
-        CacheDataStore data = partitionData(part);
-
-        final GridCursor<? extends CacheDataRow> cur = data.cursor(CacheDataRowAdapter.RowData.FULL_WITH_HINTS);
+        final CacheDataStore data = partitionData(part);
 
         return new GridCloseableIteratorAdapter<CacheDataRow>() {
             /** */
             private CacheDataRow next;
 
+            /** */
+            private GridCursor<? extends CacheDataRow> cur;
+
             @Override protected CacheDataRow onNext() {
                 CacheDataRow res = next;
 
@@ -1157,13 +1158,21 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             }
 
             @Override protected boolean onHasNext() throws IgniteCheckedException {
+                if (cur == null)
+                    cur = data.cursor(CacheDataRowAdapter.RowData.FULL_WITH_HINTS);
+
                 if (next != null)
                     return true;
 
                 if (cur.next())
                     next = cur.get();
 
-                return next != null;
+                boolean hasNext = next != null;
+
+                if (!hasNext)
+                    cur = null;
+
+                return hasNext;
             }
 
             @Override protected void onClose() throws IgniteCheckedException {
@@ -1171,6 +1180,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                     : "Partition should be in OWNING state and has at least 1 reservation: " + loc;
 
                 loc.release();
+
+                cur = null;
             }
         };
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RebalanceIteratorLargeEntriesOOMTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RebalanceIteratorLargeEntriesOOMTest.java
new file mode 100644
index 0000000..f8be92d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RebalanceIteratorLargeEntriesOOMTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy;
+import org.junit.Test;
+
+/**
+ * Tests rebalance of huge cache with large entries.
+ * OOM should not occur on supplier during handling demand message.
+ */
+public class RebalanceIteratorLargeEntriesOOMTest extends GridCommonAbstractTest {
+    /** */
+    private static final String REPLICATED_CACHE_NAME = "repl-cache";
+
+    /** */
+    private static final int KB = 1 << 10;
+
+    /** */
+    private static final int GB = 1 << 30;
+
+    /** */
+    private static final long MAX_REGION_SIZE = GB;
+
+    /** */
+    private static final int PAYLOAD_SIZE = 200 * KB;
+
+    /** */
+    private static final int NUM_LOAD_THREADS = 4;
+
+    /**
+     * Get number of items per loader in order to fill 90% of memory region.
+     */
+    private static final long INTERVAL = MAX_REGION_SIZE * 9 / (10 * NUM_LOAD_THREADS * PAYLOAD_SIZE);
+
+    /** {@inheritDoc} */
+    @Override protected List<String> additionalRemoteJvmArgs() {
+        return Arrays.asList("-Xmx256m", "-Xms256m");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean isRemoteJvm(String igniteInstanceName) {
+        return igniteInstanceName.startsWith("supplier");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setInitialSize(GB)
+                .setMaxSize(GB)
+            )
+        );
+        cfg.setCacheConfiguration(new CacheConfiguration<>(REPLICATED_CACHE_NAME)
+            .setCacheMode(CacheMode.REPLICATED));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        IgniteProcessProxy.killAll();
+    }
+
+    /** */
+    @Test
+    public void testRebalance() throws Exception {
+        startSupplier();
+        IgniteEx client = startClientGrid("client");
+
+        GridTestUtils.runMultiThreaded((Integer idx) -> {
+            try (IgniteDataStreamer<Object, Object> streamer = client.dataStreamer(REPLICATED_CACHE_NAME)) {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                byte[] buf = new byte[PAYLOAD_SIZE];
+
+                for (long i = idx * INTERVAL; i < (idx + 1) * INTERVAL; i++) {
+                    rnd.nextBytes(buf);
+
+                    streamer.addData(i, buf);
+                }
+
+                streamer.flush();
+            }
+        }, NUM_LOAD_THREADS, "loader-");
+
+        startDemander();
+        awaitPartitionMapExchange(true, true, null);
+    }
+
+    /**
+     * Start remote supplier.
+     */
+    private void startSupplier() throws Exception {
+        startGrid(0);
+
+        startGrid("supplier");
+
+        stopGrid(0);
+    }
+
+    /** */
+    private void startDemander() throws Exception {
+        startGrid("demander");
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 4fb38bd..bdb174a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitNearReade
 import org.apache.ignite.internal.processors.cache.MemoryPolicyConfigValidationTest;
 import org.apache.ignite.internal.processors.cache.NoPresentCacheInterceptorOnClientTest;
 import org.apache.ignite.internal.processors.cache.NonAffinityCoordinatorDynamicStartStopTest;
+import org.apache.ignite.internal.processors.cache.RebalanceIteratorLargeEntriesOOMTest;
 import org.apache.ignite.internal.processors.cache.TransactionValidationTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheDetectLostPartitionsTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTest;
@@ -386,6 +387,8 @@ public class IgniteCacheTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, CacheDetectLostPartitionsTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, TransactionValidationTest.class, ignoredTests);
 
+        GridTestUtils.addTestIfNeeded(suite, RebalanceIteratorLargeEntriesOOMTest.class, ignoredTests);
+
         return suite;
     }
 }