You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/01/13 10:07:46 UTC

[ignite] branch master updated: IGNITE-13960 fix starvation in management pool caused by MetadataTask execution (#8647)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ce07a6  IGNITE-13960 fix starvation in management pool caused by MetadataTask execution (#8647)
7ce07a6 is described below

commit 7ce07a6ae92cd1901f91859d46eab2e0bbd19ffe
Author: pvinokurov <vi...@gmail.com>
AuthorDate: Wed Jan 13 13:07:25 2021 +0300

    IGNITE-13960 fix starvation in management pool caused by MetadataTask execution (#8647)
---
 .../cache/query/GridCacheQueryManager.java         |  79 +++++++++-----
 .../handlers/cache/GridCacheCommandHandler.java    |  62 +++++++----
 .../cache/GridCacheMetadataCommandTest.java        | 113 +++++++++++++++++++++
 .../testsuites/IgniteRestHandlerTestSuite.java     |   2 +
 4 files changed, 213 insertions(+), 43 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 07d906b..f0d0552 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -100,9 +100,11 @@ import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
 import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.GridSpiCloseableIteratorWrapper;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridIterator;
+import org.apache.ignite.internal.util.lang.IgniteClosureX;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -127,6 +129,7 @@ import org.apache.ignite.spi.IgniteSpiCloseableIterator;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
 import org.apache.ignite.spi.indexing.IndexingSpi;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_QUIET;
@@ -1841,12 +1844,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
-     * Gets SQL metadata.
+     * Gets SQL metadata asynchronously.
      *
-     * @return SQL metadata.
+     * @return SQL metadata future.
      * @throws IgniteCheckedException In case of error.
      */
-    public Collection<GridCacheSqlMetadata> sqlMetadata() throws IgniteCheckedException {
+    public IgniteInternalFuture<Collection<GridCacheSqlMetadata>> sqlMetadataAsync() throws IgniteCheckedException {
         if (!enterBusy())
             throw new IllegalStateException("Failed to get metadata (grid is stopping).");
 
@@ -1867,39 +1870,65 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             // Get local metadata.
             IgniteInternalFuture<Collection<CacheSqlMetadata>> locFut = cctx.closures().callLocalSafe(job, true);
 
-            if (rmtFut != null)
-                res.addAll(rmtFut.get());
-
             res.add(locFut.get());
 
-            Map<String, Collection<CacheSqlMetadata>> map = new HashMap<>();
+            if (rmtFut == null)
+                return new GridFinishedFuture<>(convertMetadata(res));
 
-            for (Collection<CacheSqlMetadata> col : res) {
-                for (CacheSqlMetadata meta : col) {
-                    String name = meta.cacheName();
+            return rmtFut.chain(new IgniteClosureX<IgniteInternalFuture<Collection<Collection<CacheSqlMetadata>>>, Collection<GridCacheSqlMetadata>>() {
+                @Override public Collection<GridCacheSqlMetadata> applyx(
+                    IgniteInternalFuture<Collection<Collection<CacheSqlMetadata>>> fut) throws IgniteCheckedException {
+                    res.addAll(fut.get());
 
-                    Collection<CacheSqlMetadata> cacheMetas = map.get(name);
+                    return convertMetadata(res);
+                }
+            });
+        }
+        finally {
+            leaveBusy();
+        }
+    }
 
-                    if (cacheMetas == null)
-                        map.put(name, cacheMetas = new LinkedList<>());
+    /**
+     * Transforms collections of {@link CacheSqlMetadata} collected from nodes into collection of {@link
+     * GridCacheSqlMetadata}.
+     *
+     * @param res collections of metadata from nodes.
+     * @return collection of aggregated metadata.
+     */
+    @NotNull private Collection<GridCacheSqlMetadata> convertMetadata(
+        Collection<Collection<CacheSqlMetadata>> res) {
+        Map<String, Collection<CacheSqlMetadata>> map = new HashMap<>();
 
-                    cacheMetas.add(meta);
-                }
+        for (Collection<CacheSqlMetadata> col : res) {
+            for (CacheSqlMetadata meta : col) {
+                String name = meta.cacheName();
+
+                Collection<CacheSqlMetadata> cacheMetas = map.computeIfAbsent(name, k -> new LinkedList<>());
+
+                cacheMetas.add(meta);
             }
+        }
 
-            Collection<GridCacheSqlMetadata> col = new ArrayList<>(map.size());
+        Collection<GridCacheSqlMetadata> col = new ArrayList<>(map.size());
 
-            // Metadata for current cache must be first in list.
-            col.add(new CacheSqlMetadata(map.remove(cacheName)));
+        // Metadata for current cache must be first in list.
+        col.add(new CacheSqlMetadata(map.remove(cacheName)));
 
-            for (Collection<CacheSqlMetadata> metas : map.values())
-                col.add(new CacheSqlMetadata(metas));
+        for (Collection<CacheSqlMetadata> metas : map.values())
+            col.add(new CacheSqlMetadata(metas));
 
-            return col;
-        }
-        finally {
-            leaveBusy();
-        }
+        return col;
+    }
+
+    /**
+     * Gets SQL metadata.
+     *
+     * @return SQL metadata.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public Collection<GridCacheSqlMetadata> sqlMetadata() throws IgniteCheckedException {
+        return sqlMetadataAsync().get();
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index 2e7ea36..22e9d51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -44,6 +44,7 @@ import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobContext;
 import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.compute.ComputeTaskAdapter;
 import org.apache.ignite.internal.GridKernalContext;
@@ -75,7 +76,9 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.JobContextResource;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -1109,34 +1112,57 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
         @IgniteInstanceResource
         private transient IgniteEx ignite;
 
+        /** Auto-inject job context. */
+        @JobContextResource
+        private transient ComputeJobContext jobCtx;
+
+        /** Metadata future. */
+        private transient IgniteInternalFuture<Collection<GridCacheSqlMetadata>> future;
+
+        /** Cache name. */
+        private transient String cacheName;
+
         /** {@inheritDoc} */
         @Override public Collection<GridCacheSqlMetadata> execute() {
-            String cacheName = null;
+            try {
+               if (future == null) {
+                    if (!ignite.cluster().active())
+                        return Collections.emptyList();
 
-            if (!ignite.cluster().active())
-                return Collections.emptyList();
+                    IgniteInternalCache<?, ?> cache = null;
 
-            IgniteInternalCache<?, ?> cache = null;
+                    if (!F.isEmpty(arguments())) {
+                        cacheName = argument(0);
 
-            if (!F.isEmpty(arguments())) {
-                cacheName = argument(0);
+                        cache = ignite.context().cache().publicCache(cacheName);
 
-                cache = ignite.context().cache().publicCache(cacheName);
+                        assert cache != null;
+                    }
+                    else {
+                        IgniteCacheProxy<?, ?> pubCache = F.first(ignite.context().cache().publicCaches());
 
-                assert cache != null;
-            }
-            else {
-                IgniteCacheProxy<?, ?> pubCache = F.first(ignite.context().cache().publicCaches());
+                        if (pubCache != null)
+                            cache = pubCache.internalProxy();
+
+                        if (cache == null)
+                            return Collections.emptyList();
+                    }
 
-                if (pubCache != null)
-                    cache = pubCache.internalProxy();
+                    future = cache.context().queries().sqlMetadataAsync();
 
-                if (cache == null)
-                    return Collections.emptyList();
-            }
+                    jobCtx.holdcc();
 
-            try {
-                Collection<GridCacheSqlMetadata> metas = cache.context().queries().sqlMetadata();
+                    future.listen(new IgniteInClosure<IgniteInternalFuture<Collection<GridCacheSqlMetadata>>>() {
+                        @Override public void apply(IgniteInternalFuture<Collection<GridCacheSqlMetadata>> future) {
+                            if (future.isDone())
+                                jobCtx.callcc();
+                        }
+                    });
+
+                    return null;
+                }
+
+                Collection<GridCacheSqlMetadata> metas = future.get();
 
                 if (cacheName != null) {
                     for (GridCacheSqlMetadata meta : metas)
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheMetadataCommandTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheMetadataCommandTest.java
new file mode 100644
index 0000000..e13e16e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheMetadataCommandTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.rest.handlers.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.rest.GridRestCommand;
+import org.apache.ignite.internal.processors.rest.GridRestResponse;
+import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandler;
+import org.apache.ignite.internal.processors.rest.request.GridRestCacheRequest;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Cache metadata command tests.
+ */
+public class GridCacheMetadataCommandTest extends GridCommonAbstractTest {
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+        cfg.setManagementThreadPoolSize(2);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids(false);
+    }
+
+    /**
+     * <p>Test for requesting the cache's metadata from multiple threads
+     * in order to detect starvation or deadlock in the mngmt pool caused by calling other internal tasks within the
+     * metadata task.</p>
+     *
+     * <p>Steps to reproduce:</p>
+     * <ul>
+     *  <li>Start a few server nodes with the small size of the mngmt pool.</li>
+     *  <li>Call the metadata task by requesting REST API from multiple threads.</li>
+     *  <li>Check all requests have finished successfully.</li>
+     * </ul>
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void cacheMetadataWithMultupleThreads() throws Exception {
+        int servers = 2;
+        int iterations = 1000;
+        int threads = 10;
+
+        startGrids(servers);
+
+        ExecutorService ex = Executors.newFixedThreadPool(threads);
+
+        try {
+            List<Future<?>> futures = new ArrayList<>();
+
+            for (int i = 0; i < threads; i++) {
+                futures.add(ex.submit(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        GridRestCommandHandler hnd = new GridCacheCommandHandler((grid(0)).context());
+
+                        GridRestCacheRequest req = new GridRestCacheRequest();
+
+                        req.command(GridRestCommand.CACHE_METADATA);
+                        req.cacheName(DEFAULT_CACHE_NAME);
+
+                        for (int i = 0; i < iterations; i++) {
+                            GridRestResponse resp = hnd.handleAsync(req).get();
+
+                            assertEquals(GridRestResponse.STATUS_SUCCESS, resp.getSuccessStatus());
+                        }
+
+                        return null;
+                    }
+                }));
+            }
+
+            for (Future<?> f : futures)
+                f.get(1, TimeUnit.MINUTES);
+        }
+        finally {
+            ex.shutdownNow();
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java
index 982fe25..07c593d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.rest.RestProcessorInitializationTes
 import org.apache.ignite.internal.processors.rest.RestProtocolStartTest;
 import org.apache.ignite.internal.processors.rest.handlers.cache.GridCacheAtomicCommandHandlerSelfTest;
 import org.apache.ignite.internal.processors.rest.handlers.cache.GridCacheCommandHandlerSelfTest;
+import org.apache.ignite.internal.processors.rest.handlers.cache.GridCacheMetadataCommandTest;
 import org.apache.ignite.internal.processors.rest.handlers.log.GridLogCommandHandlerTest;
 import org.apache.ignite.internal.processors.rest.handlers.query.GridQueryCommandHandlerTest;
 import org.apache.ignite.internal.processors.rest.handlers.top.CacheTopologyCommandHandlerTest;
@@ -35,6 +36,7 @@ import org.junit.runners.Suite;
 @Suite.SuiteClasses({
     GridCacheCommandHandlerSelfTest.class,
     GridCacheAtomicCommandHandlerSelfTest.class,
+    GridCacheMetadataCommandTest.class,
     GridLogCommandHandlerTest.class,
     GridQueryCommandHandlerTest.class,
     CacheTopologyCommandHandlerTest.class,