You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/01/13 10:07:46 UTC
[ignite] branch master updated: IGNITE-13960 fix starvation in
management pool caused by MetadataTask execution (#8647)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 7ce07a6 IGNITE-13960 fix starvation in management pool caused by MetadataTask execution (#8647)
7ce07a6 is described below
commit 7ce07a6ae92cd1901f91859d46eab2e0bbd19ffe
Author: pvinokurov <vi...@gmail.com>
AuthorDate: Wed Jan 13 13:07:25 2021 +0300
IGNITE-13960 fix starvation in management pool caused by MetadataTask execution (#8647)
---
.../cache/query/GridCacheQueryManager.java | 79 +++++++++-----
.../handlers/cache/GridCacheCommandHandler.java | 62 +++++++----
.../cache/GridCacheMetadataCommandTest.java | 113 +++++++++++++++++++++
.../testsuites/IgniteRestHandlerTestSuite.java | 2 +
4 files changed, 213 insertions(+), 43 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 07d906b..f0d0552 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -100,9 +100,11 @@ import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.GridSpiCloseableIteratorWrapper;
import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridIterator;
+import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
@@ -127,6 +129,7 @@ import org.apache.ignite.spi.IgniteSpiCloseableIterator;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
import org.apache.ignite.spi.indexing.IndexingSpi;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_QUIET;
@@ -1841,12 +1844,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
- * Gets SQL metadata.
+ * Gets SQL metadata asynchronously.
*
- * @return SQL metadata.
+ * @return SQL metadata future.
* @throws IgniteCheckedException In case of error.
*/
- public Collection<GridCacheSqlMetadata> sqlMetadata() throws IgniteCheckedException {
+ public IgniteInternalFuture<Collection<GridCacheSqlMetadata>> sqlMetadataAsync() throws IgniteCheckedException {
if (!enterBusy())
throw new IllegalStateException("Failed to get metadata (grid is stopping).");
@@ -1867,39 +1870,65 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
// Get local metadata.
IgniteInternalFuture<Collection<CacheSqlMetadata>> locFut = cctx.closures().callLocalSafe(job, true);
- if (rmtFut != null)
- res.addAll(rmtFut.get());
-
res.add(locFut.get());
- Map<String, Collection<CacheSqlMetadata>> map = new HashMap<>();
+ if (rmtFut == null)
+ return new GridFinishedFuture<>(convertMetadata(res));
- for (Collection<CacheSqlMetadata> col : res) {
- for (CacheSqlMetadata meta : col) {
- String name = meta.cacheName();
+ return rmtFut.chain(new IgniteClosureX<IgniteInternalFuture<Collection<Collection<CacheSqlMetadata>>>, Collection<GridCacheSqlMetadata>>() {
+ @Override public Collection<GridCacheSqlMetadata> applyx(
+ IgniteInternalFuture<Collection<Collection<CacheSqlMetadata>>> fut) throws IgniteCheckedException {
+ res.addAll(fut.get());
- Collection<CacheSqlMetadata> cacheMetas = map.get(name);
+ return convertMetadata(res);
+ }
+ });
+ }
+ finally {
+ leaveBusy();
+ }
+ }
- if (cacheMetas == null)
- map.put(name, cacheMetas = new LinkedList<>());
+ /**
+ * Transforms collections of {@link CacheSqlMetadata} collected from nodes into collection of {@link
+ * GridCacheSqlMetadata}.
+ *
+ * @param res collections of metadata from nodes.
+ * @return collection of aggregated metadata.
+ */
+ @NotNull private Collection<GridCacheSqlMetadata> convertMetadata(
+ Collection<Collection<CacheSqlMetadata>> res) {
+ Map<String, Collection<CacheSqlMetadata>> map = new HashMap<>();
- cacheMetas.add(meta);
- }
+ for (Collection<CacheSqlMetadata> col : res) {
+ for (CacheSqlMetadata meta : col) {
+ String name = meta.cacheName();
+
+ Collection<CacheSqlMetadata> cacheMetas = map.computeIfAbsent(name, k -> new LinkedList<>());
+
+ cacheMetas.add(meta);
}
+ }
- Collection<GridCacheSqlMetadata> col = new ArrayList<>(map.size());
+ Collection<GridCacheSqlMetadata> col = new ArrayList<>(map.size());
- // Metadata for current cache must be first in list.
- col.add(new CacheSqlMetadata(map.remove(cacheName)));
+ // Metadata for current cache must be first in list.
+ col.add(new CacheSqlMetadata(map.remove(cacheName)));
- for (Collection<CacheSqlMetadata> metas : map.values())
- col.add(new CacheSqlMetadata(metas));
+ for (Collection<CacheSqlMetadata> metas : map.values())
+ col.add(new CacheSqlMetadata(metas));
- return col;
- }
- finally {
- leaveBusy();
- }
+ return col;
+ }
+
+ /**
+ * Gets SQL metadata.
+ *
+ * @return SQL metadata.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public Collection<GridCacheSqlMetadata> sqlMetadata() throws IgniteCheckedException {
+ return sqlMetadataAsync().get();
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index 2e7ea36..22e9d51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -44,6 +44,7 @@ import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.GridKernalContext;
@@ -75,7 +76,9 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.JobContextResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -1109,34 +1112,57 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
@IgniteInstanceResource
private transient IgniteEx ignite;
+ /** Auto-inject job context. */
+ @JobContextResource
+ private transient ComputeJobContext jobCtx;
+
+ /** Metadata future. */
+ private transient IgniteInternalFuture<Collection<GridCacheSqlMetadata>> future;
+
+ /** Cache name. */
+ private transient String cacheName;
+
/** {@inheritDoc} */
@Override public Collection<GridCacheSqlMetadata> execute() {
- String cacheName = null;
+ try {
+ if (future == null) {
+ if (!ignite.cluster().active())
+ return Collections.emptyList();
- if (!ignite.cluster().active())
- return Collections.emptyList();
+ IgniteInternalCache<?, ?> cache = null;
- IgniteInternalCache<?, ?> cache = null;
+ if (!F.isEmpty(arguments())) {
+ cacheName = argument(0);
- if (!F.isEmpty(arguments())) {
- cacheName = argument(0);
+ cache = ignite.context().cache().publicCache(cacheName);
- cache = ignite.context().cache().publicCache(cacheName);
+ assert cache != null;
+ }
+ else {
+ IgniteCacheProxy<?, ?> pubCache = F.first(ignite.context().cache().publicCaches());
- assert cache != null;
- }
- else {
- IgniteCacheProxy<?, ?> pubCache = F.first(ignite.context().cache().publicCaches());
+ if (pubCache != null)
+ cache = pubCache.internalProxy();
+
+ if (cache == null)
+ return Collections.emptyList();
+ }
- if (pubCache != null)
- cache = pubCache.internalProxy();
+ future = cache.context().queries().sqlMetadataAsync();
- if (cache == null)
- return Collections.emptyList();
- }
+ jobCtx.holdcc();
- try {
- Collection<GridCacheSqlMetadata> metas = cache.context().queries().sqlMetadata();
+ future.listen(new IgniteInClosure<IgniteInternalFuture<Collection<GridCacheSqlMetadata>>>() {
+ @Override public void apply(IgniteInternalFuture<Collection<GridCacheSqlMetadata>> future) {
+ if (future.isDone())
+ jobCtx.callcc();
+ }
+ });
+
+ return null;
+ }
+
+ Collection<GridCacheSqlMetadata> metas = future.get();
if (cacheName != null) {
for (GridCacheSqlMetadata meta : metas)
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheMetadataCommandTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheMetadataCommandTest.java
new file mode 100644
index 0000000..e13e16e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheMetadataCommandTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.rest.handlers.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.rest.GridRestCommand;
+import org.apache.ignite.internal.processors.rest.GridRestResponse;
+import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandler;
+import org.apache.ignite.internal.processors.rest.request.GridRestCacheRequest;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Cache metadata command tests.
+ */
+public class GridCacheMetadataCommandTest extends GridCommonAbstractTest {
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+ cfg.setManagementThreadPoolSize(2);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids(false);
+ }
+
+ /**
+ * <p>Test for requesting the cache's metadata from multiple threads
+ * in order to detect starvation or deadlock in the mngmt pool caused by calling other internal tasks within the
+ * metadata task.</p>
+ *
+ * <p>Steps to reproduce:</p>
+ * <ul>
+ * <li>Start a few server nodes with the small size of the mngmt pool.</li>
+ * <li>Call the metadata task by requesting REST API from multiple threads.</li>
+ * <li>Check all requests have finished successfully.</li>
+ * </ul>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void cacheMetadataWithMultupleThreads() throws Exception {
+ int servers = 2;
+ int iterations = 1000;
+ int threads = 10;
+
+ startGrids(servers);
+
+ ExecutorService ex = Executors.newFixedThreadPool(threads);
+
+ try {
+ List<Future<?>> futures = new ArrayList<>();
+
+ for (int i = 0; i < threads; i++) {
+ futures.add(ex.submit(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ GridRestCommandHandler hnd = new GridCacheCommandHandler((grid(0)).context());
+
+ GridRestCacheRequest req = new GridRestCacheRequest();
+
+ req.command(GridRestCommand.CACHE_METADATA);
+ req.cacheName(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < iterations; i++) {
+ GridRestResponse resp = hnd.handleAsync(req).get();
+
+ assertEquals(GridRestResponse.STATUS_SUCCESS, resp.getSuccessStatus());
+ }
+
+ return null;
+ }
+ }));
+ }
+
+ for (Future<?> f : futures)
+ f.get(1, TimeUnit.MINUTES);
+ }
+ finally {
+ ex.shutdownNow();
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java
index 982fe25..07c593d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.rest.RestProcessorInitializationTes
import org.apache.ignite.internal.processors.rest.RestProtocolStartTest;
import org.apache.ignite.internal.processors.rest.handlers.cache.GridCacheAtomicCommandHandlerSelfTest;
import org.apache.ignite.internal.processors.rest.handlers.cache.GridCacheCommandHandlerSelfTest;
+import org.apache.ignite.internal.processors.rest.handlers.cache.GridCacheMetadataCommandTest;
import org.apache.ignite.internal.processors.rest.handlers.log.GridLogCommandHandlerTest;
import org.apache.ignite.internal.processors.rest.handlers.query.GridQueryCommandHandlerTest;
import org.apache.ignite.internal.processors.rest.handlers.top.CacheTopologyCommandHandlerTest;
@@ -35,6 +36,7 @@ import org.junit.runners.Suite;
@Suite.SuiteClasses({
GridCacheCommandHandlerSelfTest.class,
GridCacheAtomicCommandHandlerSelfTest.class,
+ GridCacheMetadataCommandTest.class,
GridLogCommandHandlerTest.class,
GridQueryCommandHandlerTest.class,
CacheTopologyCommandHandlerTest.class,