You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2019/12/10 12:22:25 UTC

[ignite] branch master updated: IGNITE-12365 Concurrent removeAll() on the same cache leads to deadlock - Fixes #7111.

This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b694fd1  IGNITE-12365 Concurrent removeAll() on the same cache leads to deadlock - Fixes #7111.
b694fd1 is described below

commit b694fd103354d827800b3e830ad5619bd27574b0
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Tue Dec 10 15:22:09 2019 +0300

    IGNITE-12365 Concurrent removeAll() on the same cache leads to deadlock - Fixes #7111.
    
    Signed-off-by: Ivan Rakov <ir...@apache.org>
---
 .../processors/cache/GridCacheContext.java         |  13 +++
 .../distributed/GridDistributedCacheAdapter.java   | 106 ++++++++++++++++-----
 .../apache/ignite/cache/RemoveAllDeadlockTest.java |  95 ++++++++++++++++++
 .../ignite/testsuites/IgniteReproducingSuite.java  |   5 +-
 4 files changed, 193 insertions(+), 26 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 9ddafb2..e2227e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -35,6 +35,7 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.expiry.EternalExpiryPolicy;
@@ -53,6 +54,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
@@ -63,6 +65,7 @@ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -282,6 +285,9 @@ public class GridCacheContext<K, V> implements Externalizable {
     private final boolean disableTriggeringCacheInterceptorOnConflict =
         Boolean.parseBoolean(System.getProperty(IGNITE_DISABLE_TRIGGERING_CACHE_INTERCEPTOR_ON_CONFLICT, "false"));
 
+    /** Last remove all job future. */
+    private AtomicReference<IgniteInternalFuture<Boolean>> lastRmvAllJobFut = new AtomicReference<>();
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -2339,6 +2345,13 @@ public class GridCacheContext<K, V> implements Externalizable {
             contQryMgr.notifyContinuousQueries(tx) && !F.isEmpty(contQryMgr.updateListeners(false, false));
     }
 
+    /**
+     * Returns future that assigned to last performing {@link GridDistributedCacheAdapter.GlobalRemoveAllJob}
+     */
+    public AtomicReference<IgniteInternalFuture<Boolean>> lastRemoveAllJobFut() {
+        return lastRmvAllJobFut;
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeString(out, igniteInstanceName());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index ce9e71e..a3afa85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CachePeekMode;
@@ -31,8 +32,8 @@ import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.compute.ComputeJobResultPolicy;
 import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
@@ -401,6 +402,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
         /** Keep binary flag. */
         private final boolean keepBinary;
 
+        /** Future that needed for applying exactly one {@link GlobalRemoveAllJob} */
+        private transient GridFutureAdapter<Boolean> locFut;
+
         /**
          * @param cacheName Cache name.
          * @param topVer Topology version.
@@ -420,18 +424,37 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
 
         /** {@inheritDoc} */
         @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache0) {
-            GridCacheAdapter cache = ((IgniteKernal) ignite).context().cache().internalCache(cacheName);
+            if (locFut == null)
+                locFut = new GridFutureAdapter<>();
+
+            if (locFut.isDone()) {
+                if (locFut.isFailed())
+                    throw U.convertException((IgniteCheckedException)locFut.error());
+
+                try {
+                    return locFut.get();
+                }
+                catch (IgniteCheckedException ignored) {
+                    // Should be never thrown.
+                }
+            }
+
+            GridCacheAdapter<K, V> cache = ((IgniteEx)ignite).context().cache().internalCache(cacheName);
 
             if (cache == null)
-                return true;
+                return completeWithResult(true);
 
             final GridCacheContext<K, V> ctx = cache.context();
 
             ctx.gate().enter();
 
+            final AtomicReference<IgniteInternalFuture<Boolean>> refToLastFut = ctx.lastRemoveAllJobFut();
+
+            refToLastFut.set(locFut);
+
             try {
                 if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
-                    return false; // Ignore this remove request because remove request will be sent again.
+                    return completeWithResult(false); // Ignore this remove request because remove request will be sent again.
 
                 GridDhtCacheAdapter<K, V> dht;
                 GridNearCacheAdapter<K, V> near = null;
@@ -443,36 +466,60 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                 else
                     dht = (GridDhtCacheAdapter<K, V>) cache;
 
-                try (DataStreamerImpl<KeyCacheObject, Object> dataLdr =
-                         (DataStreamerImpl) ignite.dataStreamer(cacheName)) {
-                    ((DataStreamerImpl) dataLdr).maxRemapCount(0);
+                try (DataStreamerImpl dataLdr = (DataStreamerImpl) ignite.dataStreamer(cacheName)) {
+                    dataLdr.maxRemapCount(0);
 
                     dataLdr.skipStore(skipStore);
                     dataLdr.keepBinary(keepBinary);
 
-                    dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
+                    dataLdr.receiver(DataStreamerCacheUpdaters.batched());
 
                     for (int part : ctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)) {
-                        GridDhtLocalPartition locPart = dht.topology().localPartition(part, topVer, false);
+                        //Synchronization is needed for case when job hasn't completed removing for one partition yet
+                        // while concurrent job changed last job future and started removing
+                        synchronized (refToLastFut) {
+                            IgniteInternalFuture<Boolean> lastFut = ctx.lastRemoveAllJobFut().get();
+
+                            if (lastFut != locFut) {
+                                lastFut.listen((IgniteInClosure<IgniteInternalFuture<Boolean>>)fut -> {
+                                    if (lastFut.error() != null)
+                                        locFut.onDone(lastFut.error());
+                                    else {
+                                        try {
+                                            completeWithResult(fut.get());
+                                        }
+                                        catch (IgniteCheckedException ignored) {
+                                            // Should be never thrown.
+                                        }
+                                    }
+
+                                    jobCtx.callcc();
+                                });
+
+                                return jobCtx.holdcc();
+                            }
 
-                        if (locPart == null || (ctx.rebalanceEnabled() && locPart.state() != OWNING) || !locPart.reserve())
-                            return false;
+                            GridDhtLocalPartition locPart = dht.topology().localPartition(part, topVer, false);
 
-                        try {
-                            GridCloseableIterator<KeyCacheObject> iter = dht.context().offheap().cacheKeysIterator(ctx.cacheId(), part);
+                            if (locPart == null || (ctx.rebalanceEnabled() && locPart.state() != OWNING) || !locPart.reserve())
+                                return completeWithResult(false);
 
-                            if (iter != null) {
-                                try {
-                                    while (iter.hasNext())
-                                        dataLdr.removeDataInternal(iter.next());
-                                }
-                                finally {
-                                    iter.close();
+                            try {
+                                GridCloseableIterator<KeyCacheObject> iter = dht.context().offheap().cacheKeysIterator(ctx.cacheId(), part);
+
+                                if (iter != null) {
+                                    try {
+                                        while (iter.hasNext())
+                                            dataLdr.removeDataInternal(iter.next());
+                                    }
+                                    finally {
+                                        iter.close();
+                                    }
                                 }
                             }
-                        }
-                        finally {
-                            locPart.release();
+                            finally {
+                                locPart.release();
+                            }
                         }
                     }
                 }
@@ -485,15 +532,26 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                             near.removeEntry(e);
                     }
                 }
+
+                return completeWithResult(true);
             }
             catch (IgniteCheckedException e) {
+                locFut.onDone(e);
+
                 throw U.convertException(e);
             }
             finally {
                 ctx.gate().leave();
             }
+        }
 
-            return true;
+        /**
+         * Completes future with provided result.
+         */
+        private boolean completeWithResult(boolean b) {
+            locFut.onDone(b);
+
+            return b;
         }
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/RemoveAllDeadlockTest.java b/modules/core/src/test/java/org/apache/ignite/cache/RemoveAllDeadlockTest.java
new file mode 100644
index 0000000..267a9f7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/RemoveAllDeadlockTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * This test is needed for reproducing possible deadlock on concurrent {@link IgniteCache#removeAll()}
+ */
+public class RemoveAllDeadlockTest extends GridCommonAbstractTest {
+    /** Threads number for reproducing deadlock. */
+    public static final int THREADS = 4;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRemoveAllAtomicPartitioned() throws Exception {
+        startGrid(1);
+
+        CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+
+        cacheCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+
+        cacheCfg.setBackups(1);
+
+        IgniteCache<Integer, Integer> cache = grid(1).getOrCreateCache(cacheCfg);
+
+        removeAllConcurrent(cache);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRemoveAllAtomicReplicated() throws Exception {
+        startGrid(1);
+
+        CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+        cacheCfg.setCacheMode(CacheMode.REPLICATED);
+
+        cacheCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+
+        cacheCfg.setBackups(0);
+
+        IgniteCache<Integer, Integer> cache = grid(1).getOrCreateCache(cacheCfg);
+
+        removeAllConcurrent(cache);
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void removeAllConcurrent(IgniteCache<Integer, Integer> cache) throws Exception {
+        multithreaded(() -> {
+            for (int i = 0; i < 1234; i++) {
+                final int c = i % 123;
+
+                if (c % 15 != 0) {
+
+                    for (int j = i; j < c + i; j++)
+                        cache.put(j, j * c);
+                }
+                else
+                    cache.removeAll();
+            }
+        }, THREADS);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java
index 3696cf9..5bc41f4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.ignite.cache.RemoveAllDeadlockTest;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -48,8 +49,8 @@ public class IgniteReproducingSuite {
             suite.add(IgniteReproducingSuite.TestStub.class);
 
             //uncomment to add some test
-            //for (int i = 0; i < 100; i++)
-            //    suite.add(IgniteCheckpointDirtyPagesForLowLoadTest.class);
+            for (int i = 0; i < 50; i++)
+                suite.add(RemoveAllDeadlockTest.class);
 
             return suite;
         }