You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2021/01/18 15:14:14 UTC
[ignite] branch master updated: IGNITE-14003 Improve heap
consumption on reserving rebalance iterator. (#8671)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f4bf825 IGNITE-14003 Improve heap consumption on reserving rebalance iterator. (#8671)
f4bf825 is described below
commit f4bf8253f89ddccd36ee8e03d3e51ddc4bc039b4
Author: Ivan Daschinskiy <iv...@gmail.com>
AuthorDate: Mon Jan 18 18:13:40 2021 +0300
IGNITE-14003 Improve heap consumption on reserving rebalance iterator. (#8671)
---
.../cache/IgniteCacheOffheapManagerImpl.java | 19 ++-
.../RebalanceIteratorLargeEntriesOOMTest.java | 137 +++++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite2.java | 3 +
3 files changed, 155 insertions(+), 4 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 773297f..267430a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1140,14 +1140,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return null;
}
- CacheDataStore data = partitionData(part);
-
- final GridCursor<? extends CacheDataRow> cur = data.cursor(CacheDataRowAdapter.RowData.FULL_WITH_HINTS);
+ final CacheDataStore data = partitionData(part);
return new GridCloseableIteratorAdapter<CacheDataRow>() {
/** */
private CacheDataRow next;
+ /** */
+ private GridCursor<? extends CacheDataRow> cur;
+
@Override protected CacheDataRow onNext() {
CacheDataRow res = next;
@@ -1157,13 +1158,21 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
@Override protected boolean onHasNext() throws IgniteCheckedException {
+ if (cur == null)
+ cur = data.cursor(CacheDataRowAdapter.RowData.FULL_WITH_HINTS);
+
if (next != null)
return true;
if (cur.next())
next = cur.get();
- return next != null;
+ boolean hasNext = next != null;
+
+ if (!hasNext)
+ cur = null;
+
+ return hasNext;
}
@Override protected void onClose() throws IgniteCheckedException {
@@ -1171,6 +1180,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
: "Partition should be in OWNING state and has at least 1 reservation: " + loc;
loc.release();
+
+ cur = null;
}
};
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RebalanceIteratorLargeEntriesOOMTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RebalanceIteratorLargeEntriesOOMTest.java
new file mode 100644
index 0000000..f8be92d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RebalanceIteratorLargeEntriesOOMTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy;
+import org.junit.Test;
+
+/**
+ * Tests rebalance of huge cache with large entries.
+ * OOM should not occur on supplier during handling demand message.
+ */
+public class RebalanceIteratorLargeEntriesOOMTest extends GridCommonAbstractTest {
+ /** */
+ private static final String REPLICATED_CACHE_NAME = "repl-cache";
+
+ /** */
+ private static final int KB = 1 << 10;
+
+ /** */
+ private static final int GB = 1 << 30;
+
+ /** */
+ private static final long MAX_REGION_SIZE = GB;
+
+ /** */
+ private static final int PAYLOAD_SIZE = 200 * KB;
+
+ /** */
+ private static final int NUM_LOAD_THREADS = 4;
+
+ /**
+ * Get number of items per loader in order to fill 90% of memory region.
+ */
+ private static final long INTERVAL = MAX_REGION_SIZE * 9 / (10 * NUM_LOAD_THREADS * PAYLOAD_SIZE);
+
+ /** {@inheritDoc} */
+ @Override protected List<String> additionalRemoteJvmArgs() {
+ return Arrays.asList("-Xmx256m", "-Xms256m");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean isRemoteJvm(String igniteInstanceName) {
+ return igniteInstanceName.startsWith("supplier");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setInitialSize(GB)
+ .setMaxSize(GB)
+ )
+ );
+ cfg.setCacheConfiguration(new CacheConfiguration<>(REPLICATED_CACHE_NAME)
+ .setCacheMode(CacheMode.REPLICATED));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ IgniteProcessProxy.killAll();
+ }
+
+ /** */
+ @Test
+ public void testRebalance() throws Exception {
+ startSupplier();
+ IgniteEx client = startClientGrid("client");
+
+ GridTestUtils.runMultiThreaded((Integer idx) -> {
+ try (IgniteDataStreamer<Object, Object> streamer = client.dataStreamer(REPLICATED_CACHE_NAME)) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ byte[] buf = new byte[PAYLOAD_SIZE];
+
+ for (long i = idx * INTERVAL; i < (idx + 1) * INTERVAL; i++) {
+ rnd.nextBytes(buf);
+
+ streamer.addData(i, buf);
+ }
+
+ streamer.flush();
+ }
+ }, NUM_LOAD_THREADS, "loader-");
+
+ startDemander();
+ awaitPartitionMapExchange(true, true, null);
+ }
+
+ /**
+ * Start remote supplier.
+ */
+ private void startSupplier() throws Exception {
+ startGrid(0);
+
+ startGrid("supplier");
+
+ stopGrid(0);
+ }
+
+ /** */
+ private void startDemander() throws Exception {
+ startGrid("demander");
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 4fb38bd..bdb174a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitNearReade
import org.apache.ignite.internal.processors.cache.MemoryPolicyConfigValidationTest;
import org.apache.ignite.internal.processors.cache.NoPresentCacheInterceptorOnClientTest;
import org.apache.ignite.internal.processors.cache.NonAffinityCoordinatorDynamicStartStopTest;
+import org.apache.ignite.internal.processors.cache.RebalanceIteratorLargeEntriesOOMTest;
import org.apache.ignite.internal.processors.cache.TransactionValidationTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheDetectLostPartitionsTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTest;
@@ -386,6 +387,8 @@ public class IgniteCacheTestSuite2 {
GridTestUtils.addTestIfNeeded(suite, CacheDetectLostPartitionsTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, TransactionValidationTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, RebalanceIteratorLargeEntriesOOMTest.class, ignoredTests);
+
return suite;
}
}