You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/05/07 15:21:34 UTC
[ignite] 26/41: GG-17434 Fix memory leak on unstable topology
caused by partition reservation
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch gg-18540
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit a813e047f8b091ceaba6820501e6bde9e79274a6
Author: tledkov <tl...@gridgain.com>
AuthorDate: Fri Apr 26 19:54:09 2019 +0300
GG-17434 Fix memory leak on unstable topology caused by partition reservation
---
.../query/h2/twostep/PartitionReservationKey.java | 13 ++
.../h2/twostep/PartitionReservationManager.java | 73 +++++--
.../query/MemLeakOnSqlWithClientReconnectTest.java | 218 +++++++++++++++++++++
.../testsuites/IgniteCacheQuerySelfTestSuite6.java | 4 +-
4 files changed, 290 insertions(+), 18 deletions(-)
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java
index 73984bc..d02b6b8 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java
@@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
/**
* Partition reservation key.
@@ -47,6 +48,13 @@ public class PartitionReservationKey {
return cacheName;
}
+ /**
+ * @return Topology version of reservation.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
@@ -69,4 +77,9 @@ public class PartitionReservationKey {
return res;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PartitionReservationKey.class, this);
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
index e91d7d5..89d7a01 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
@@ -16,13 +16,24 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation;
@@ -30,14 +41,6 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
@@ -45,19 +48,24 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.topolo
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
/**
- * Class responsible for partition reservation for queries executed on local node.
- * Prevents partitions from being eveicted from node during query execution.
+ * Class responsible for partition reservation for queries executed on local node. Prevents partitions from being
+ * evicted from node during query execution.
*/
-public class PartitionReservationManager {
+public class PartitionReservationManager implements PartitionsExchangeAware {
/** Special instance of reservable object for REPLICATED caches. */
private static final ReplicatedReservable REPLICATED_RESERVABLE = new ReplicatedReservable();
/** Kernal context. */
private final GridKernalContext ctx;
- /** Reservations. */
+ /** Group reservations cache. When affinity version is not changed and all primary partitions must be reserved
+ * we get group reservation from this map instead of create new reservation group.
+ */
private final ConcurrentMap<PartitionReservationKey, GridReservable> reservations = new ConcurrentHashMap<>();
+ /** Logger. */
+ private final IgniteLogger log;
+
/**
* Constructor.
*
@@ -65,11 +73,15 @@ public class PartitionReservationManager {
*/
public PartitionReservationManager(GridKernalContext ctx) {
this.ctx = ctx;
+
+ log = ctx.log(PartitionReservationManager.class);
+
+ ctx.cache().context().exchange().registerExchangeAwareComponent(this);
}
/**
* @param cacheIds Cache IDs.
- * @param topVer Topology version.
+ * @param reqTopVer Topology version from request.
* @param explicitParts Explicit partitions list.
* @param nodeId Node ID.
* @param reqId Request ID.
@@ -78,18 +90,18 @@ public class PartitionReservationManager {
*/
public PartitionReservation reservePartitions(
@Nullable List<Integer> cacheIds,
- AffinityTopologyVersion topVer,
+ AffinityTopologyVersion reqTopVer,
final int[] explicitParts,
UUID nodeId,
long reqId
) throws IgniteCheckedException {
- assert topVer != null;
+ assert reqTopVer != null;
+
+ AffinityTopologyVersion topVer = ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer);
if (F.isEmpty(cacheIds))
return new PartitionReservation(Collections.emptyList());
- List<GridReservable> reserved = new ArrayList<>();
-
Collection<Integer> partIds;
if (explicitParts == null)
@@ -103,6 +115,8 @@ public class PartitionReservationManager {
partIds.add(explicitPart);
}
+ List<GridReservable> reserved = new ArrayList<>();
+
for (int i = 0; i < cacheIds.size(); i++) {
GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(cacheIds.get(i));
@@ -308,6 +322,31 @@ public class PartitionReservationManager {
}
/**
+ * Cleanup group reservations cache on change affinity version.
+ */
+ @Override public void onDoneAfterTopologyUnlock(final GridDhtPartitionsExchangeFuture fut) {
+ try {
+ // Must not do anything at the exchange thread. Dispatch to the management thread pool.
+ ctx.closure().runLocal(() -> {
+ AffinityTopologyVersion topVer = ctx.cache().context().exchange()
+ .lastAffinityChangedTopologyVersion(fut.topologyVersion());
+
+ reservations.forEach((key, r) -> {
+ if (r != REPLICATED_RESERVABLE && !F.eq(key.topologyVersion(), topVer)) {
+ assert r instanceof GridDhtPartitionsReservation;
+
+ ((GridDhtPartitionsReservation)r).invalidate();
+ }
+ });
+ },
+ GridIoPolicy.MANAGEMENT_POOL);
+ }
+ catch (Throwable e) {
+ log.error("Unexpected exception on start reservations cleanup", e);
+ }
+ }
+
+ /**
* Mapper fake reservation object for replicated caches.
*/
private static class ReplicatedReservable implements GridReservable {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MemLeakOnSqlWithClientReconnectTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MemLeakOnSqlWithClientReconnectTest.java
new file mode 100644
index 0000000..ad56e11
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MemLeakOnSqlWithClientReconnectTest.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Tests for group reservation leaks at the PartitionReservationManager on unstable topology.
+ */
+public class MemLeakOnSqlWithClientReconnectTest extends AbstractIndexingCommonTest {
+ /** Keys count. */
+ private static final int KEY_CNT = 10;
+
+ /** Keys count. */
+ private static final int ITERS = 2000;
+
+ /** Replicated cache schema name. */
+ private static final String REPL_SCHEMA = "REPL";
+
+ /** Partitioned cache schema name. */
+ private static final String PART_SCHEMA = "PART";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ if (igniteInstanceName.startsWith("cli"))
+ cfg.setClientMode(true).setGridLogger(new NullLogger());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGrid();
+
+ IgniteCache<Long, Long> partCache = grid().createCache(new CacheConfiguration<Long, Long>()
+ .setName("part")
+ .setSqlSchema("PART")
+ .setQueryEntities(Collections.singleton(new QueryEntity(Long.class, Long.class)
+ .setTableName("test")
+ .addQueryField("id", Long.class.getName(), null)
+ .addQueryField("val", Long.class.getName(), null)
+ .setKeyFieldName("id")
+ .setValueFieldName("val")
+ ))
+ .setAffinity(new RendezvousAffinityFunction(false, 10)));
+
+ IgniteCache<Long, Long> replCache = grid().createCache(new CacheConfiguration<Long, Long>()
+ .setName("repl")
+ .setSqlSchema("REPL")
+ .setQueryEntities(Collections.singleton(new QueryEntity(Long.class, Long.class)
+ .setTableName("test")
+ .addQueryField("id", Long.class.getName(), null)
+ .addQueryField("val", Long.class.getName(), null)
+ .setKeyFieldName("id")
+ .setValueFieldName("val")))
+ .setCacheMode(CacheMode.REPLICATED));
+
+ for (long i = 0; i < KEY_CNT; ++i) {
+ partCache.put(i, i);
+ replCache.put(i, i);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * Test partition group reservation leaks on partitioned cache.
+ *
+ * @throws Exception On error.
+ */
+ @Test
+ public void testPartitioned() throws Exception {
+ checkReservationLeak(false);
+ }
+
+ /**
+ * Test partition group reservation leaks on replicated cache.
+ *
+ * @throws Exception On error.
+ */
+ @Test
+ public void testReplicated() throws Exception {
+ checkReservationLeak(true);
+ }
+
+ /**
+ * Check partition group reservation leaks.
+ *
+ * @param replicated Flag to run query on partitioned or replicated cache.
+ * @throws Exception On error.
+ */
+ private void checkReservationLeak(boolean replicated) throws Exception {
+ final AtomicInteger cliNum = new AtomicInteger();
+ final AtomicBoolean end = new AtomicBoolean();
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(() -> {
+ String name = "cli_" + cliNum.getAndIncrement();
+
+ while (!end.get()) {
+ try {
+ startGrid(name);
+
+ U.sleep(10);
+
+ stopGrid(name);
+ }
+ catch (Exception e) {
+ fail("Unexpected exception on start test client node");
+ }
+ }
+ },
+ 10, "cli-restart");
+
+ try {
+
+ String mainCliName = "cli-main";
+
+ IgniteEx cli = startGrid(mainCliName);
+
+ // Warm up.
+ runQuery(cli, ITERS, replicated);
+
+ int baseReservations = reservationCount(grid());
+
+ // Run multiple queries on unstable topology.
+ runQuery(cli, ITERS * 10, replicated);
+
+ int curReservations = reservationCount(grid());
+
+ assertTrue("Reservations leaks: [base=" + baseReservations + ", cur=" + curReservations + ']',
+ curReservations < baseReservations * 2);
+
+ log.info("Reservations OK: [base=" + baseReservations + ", cur=" + curReservations + ']');
+ }
+ finally {
+ end.set(true);
+ }
+
+ fut.get();
+ }
+
+ /**
+ * @param ign Ignite.
+ * @param iters Run query 'iters' times
+ * @param repl Run on replicated or partitioned cache.
+ */
+ private void runQuery(IgniteEx ign, int iters, boolean repl) {
+ for (int i = 0; i < iters; ++i)
+ sql(ign, repl ? REPL_SCHEMA : PART_SCHEMA,"SELECT * FROM test").getAll();
+ }
+
+ /**
+ * @param ign Ignite instance.
+ * @param sql SQL query.
+ * @param args Query parameters.
+ * @return Results cursor.
+ */
+ private FieldsQueryCursor<List<?>> sql(IgniteEx ign, String schema, String sql, Object... args) {
+ return ign.context().query().querySqlFields(new SqlFieldsQuery(sql)
+ .setSchema(schema)
+ .setArgs(args), false);
+ }
+
+ /**
+ * @param ign Ignite instance.
+ * @return Count of reservations.
+ */
+ private static int reservationCount(IgniteEx ign) {
+ IgniteH2Indexing idx = (IgniteH2Indexing)ign.context().query().getIndexing();
+
+ Map reservations = GridTestUtils.getFieldValue(idx.partitionReservationManager(), "reservations");
+
+ return reservations.size();
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
index a3223df..00a7dea 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicOneNodeTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
+import org.apache.ignite.internal.processors.query.MemLeakOnSqlWithClientReconnectTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -63,7 +64,8 @@ import org.junit.runners.Suite;
CacheContinuousWithTransformerRandomOperationsTest.class,
CacheContinuousQueryRandomOperationsTest.class,
StaticCacheDdlTest.class,
- StaticCacheDdlKeepStaticConfigurationTest.class
+ StaticCacheDdlKeepStaticConfigurationTest.class,
+ MemLeakOnSqlWithClientReconnectTest.class,
})
public class IgniteCacheQuerySelfTestSuite6 {
}