You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/02/26 10:22:05 UTC

[04/13] ignite git commit: IGNITE-2709 - Fixed potential SOE on high-contented cache locks - Fixes #509.

IGNITE-2709 - Fixed potential SOE on high-contented cache locks - Fixes #509.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a7b6ad36
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a7b6ad36
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a7b6ad36

Branch: refs/heads/ignite-1186
Commit: a7b6ad36dd9b8f91877c614ce6e72c83ec1ffdf6
Parents: 711fe37
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Wed Feb 24 18:45:00 2016 -0800
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Feb 24 18:45:00 2016 -0800

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   3 +
 .../processors/cache/GridCacheMvccManager.java  | 142 ++++++++++++-------
 .../IgniteCachePutStackOverflowSelfTest.java    | 133 +++++++++++++++++
 .../testsuites/IgniteCacheTestSuite5.java       |   2 +
 4 files changed, 230 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a7b6ad36/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 6f07702..858cb71 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -367,6 +367,9 @@ public final class IgniteSystemProperties {
     /** JDBC driver cursor remove delay. */
     public static final String IGNITE_JDBC_DRIVER_CURSOR_REMOVE_DELAY = "IGNITE_JDBC_DRIVER_CURSOR_RMV_DELAY";
 
+    /** Maximum number of nested listener calls before listener notification becomes asynchronous. */
+    public static final String IGNITE_MAX_NESTED_LISTENER_CALLS = "IGNITE_MAX_NESTED_LISTENER_CALLS";
+
     /**
      * Manages {@link OptimizedMarshaller} behavior of {@code serialVersionUID} computation for
      * {@link Serializable} classes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7b6ad36/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 9aeed95..afba4bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -49,6 +50,7 @@ import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -64,6 +66,8 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_NESTED_LISTENER_CALLS;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
@@ -76,6 +80,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     /** Maxim number of removed locks. */
     private static final int MAX_REMOVED_LOCKS = 10240;
 
+    /** */
+    private static final int MAX_NESTED_LSNR_CALLS = getInteger(IGNITE_MAX_NESTED_LISTENER_CALLS, 5);
+
     /** Pending locks per thread. */
     private final ThreadLocal<Deque<GridCacheMvccCandidate>> pending = new ThreadLocal<>();
 
@@ -111,6 +118,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     /** Finish futures. */
     private final ConcurrentLinkedDeque8<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>();
 
+    /** Nested listener calls. */
+    private final ThreadLocal<Integer> nestedLsnrCalls = new ThreadLocal<Integer>() {
+        @Override protected Integer initialValue() {
+            return 0;
+        }
+    };
+
     /** Logger. */
     @SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
     private IgniteLogger exchLog;
@@ -123,60 +137,26 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     private final GridCacheMvccCallback cb = new GridCacheMvccCallback() {
         /** {@inheritDoc} */
         @SuppressWarnings({"unchecked"})
-        @Override public void onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate prev,
-            GridCacheMvccCandidate owner) {
-            assert entry != null;
-            assert owner != prev : "New and previous owner are identical instances: " + owner;
-            assert owner == null || prev == null || !owner.version().equals(prev.version()) :
-                "New and previous owners have identical versions [owner=" + owner + ", prev=" + prev + ']';
-
-            if (log.isDebugEnabled())
-                log.debug("Received owner changed callback [" + entry.key() + ", owner=" + owner + ", prev=" +
-                    prev + ']');
-
-            if (owner != null && (owner.local() || owner.nearLocal())) {
-                Collection<GridCacheMvccFuture<?>> futCol = mvccFuts.get(owner.version());
-
-                if (futCol != null) {
-                    ArrayList<GridCacheMvccFuture<?>> futColCp;
+        @Override public void onOwnerChanged(final GridCacheEntryEx entry, final GridCacheMvccCandidate prev,
+            final GridCacheMvccCandidate owner) {
+            int nested = nestedLsnrCalls.get();
 
-                    synchronized (futCol) {
-                        futColCp = new ArrayList<>(futCol.size());
+            if (nested < MAX_NESTED_LSNR_CALLS) {
+                nestedLsnrCalls.set(nested + 1);
 
-                        futColCp.addAll(futCol);
-                    }
-
-                    // Must invoke onOwnerChanged outside of synchronization block.
-                    for (GridCacheMvccFuture<?> fut : futColCp) {
-                        if (!fut.isDone()) {
-                            GridCacheMvccFuture<Boolean> mvccFut = (GridCacheMvccFuture<Boolean>)fut;
-
-                            // Since this method is called outside of entry synchronization,
-                            // we can safely invoke any method on the future.
-                            // Also note that we don't remove future here if it is done.
-                            // The removal is initiated from within future itself.
-                            if (mvccFut.onOwnerChanged(entry, owner))
-                                return;
-                        }
-                    }
+                try {
+                    notifyOwnerChanged(entry, prev, owner);
+                }
+                finally {
+                    nestedLsnrCalls.set(nested);
                 }
             }
-
-            if (log.isDebugEnabled())
-                log.debug("Lock future not found for owner change callback (will try transaction futures) [owner=" +
-                    owner + ", prev=" + prev + ", entry=" + entry + ']');
-
-            // If no future was found, delegate to transaction manager.
-            if (cctx.tm().onOwnerChanged(entry, owner)) {
-                if (log.isDebugEnabled())
-                    log.debug("Found transaction for changed owner: " + owner);
-            }
-            else if (log.isDebugEnabled())
-                log.debug("Failed to find transaction for changed owner: " + owner);
-
-            if (!finishFuts.isEmptyx()) {
-                for (FinishLockFuture f : finishFuts)
-                    f.recheck(entry);
+            else {
+                cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
+                    @Override public void run() {
+                        notifyOwnerChanged(entry, prev, owner);
+                    }
+                }, true);
             }
         }
 
@@ -197,6 +177,68 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         }
     };
 
+    /**
+     * @param entry Entry to notify callback for.
+     * @param prev Previous lock owner.
+     * @param owner Current lock owner.
+     */
+    private void notifyOwnerChanged(final GridCacheEntryEx entry, final GridCacheMvccCandidate prev,
+        final GridCacheMvccCandidate owner) {
+        assert entry != null;
+        assert owner != prev : "New and previous owner are identical instances: " + owner;
+        assert owner == null || prev == null || !owner.version().equals(prev.version()) :
+            "New and previous owners have identical versions [owner=" + owner + ", prev=" + prev + ']';
+
+        if (log.isDebugEnabled())
+            log.debug("Received owner changed callback [" + entry.key() + ", owner=" + owner + ", prev=" +
+                prev + ']');
+
+        if (owner != null && (owner.local() || owner.nearLocal())) {
+            Collection<GridCacheMvccFuture<?>> futCol = mvccFuts.get(owner.version());
+
+            if (futCol != null) {
+                ArrayList<GridCacheMvccFuture<?>> futColCp;
+
+                synchronized (futCol) {
+                    futColCp = new ArrayList<>(futCol.size());
+
+                    futColCp.addAll(futCol);
+                }
+
+                // Must invoke onOwnerChanged outside of synchronization block.
+                for (GridCacheMvccFuture<?> fut : futColCp) {
+                    if (!fut.isDone()) {
+                        final GridCacheMvccFuture<Boolean> mvccFut = (GridCacheMvccFuture<Boolean>)fut;
+
+                        // Since this method is called outside of entry synchronization,
+                        // we can safely invoke any method on the future.
+                        // Also note that we don't remove future here if it is done.
+                        // The removal is initiated from within future itself.
+                        if (mvccFut.onOwnerChanged(entry, owner))
+                            return;
+                    }
+                }
+            }
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Lock future not found for owner change callback (will try transaction futures) [owner=" +
+                owner + ", prev=" + prev + ", entry=" + entry + ']');
+
+        // If no future was found, delegate to transaction manager.
+        if (cctx.tm().onOwnerChanged(entry, owner)) {
+            if (log.isDebugEnabled())
+                log.debug("Found transaction for changed owner: " + owner);
+        }
+        else if (log.isDebugEnabled())
+            log.debug("Failed to find transaction for changed owner: " + owner);
+
+        if (!finishFuts.isEmptyx()) {
+            for (FinishLockFuture f : finishFuts)
+                f.recheck(entry);
+        }
+    }
+
     /** Discovery listener. */
     @GridToStringExclude private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7b6ad36/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutStackOverflowSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutStackOverflowSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutStackOverflowSelfTest.java
new file mode 100644
index 0000000..55d7192
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutStackOverflowSelfTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class IgniteCachePutStackOverflowSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrid(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStackLocal() throws Exception {
+        checkCache(CacheMode.LOCAL);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStackPartitioned() throws Exception {
+        checkCache(CacheMode.PARTITIONED);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStackReplicated() throws Exception {
+        checkCache(CacheMode.REPLICATED);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    private void checkCache(CacheMode mode) throws Exception {
+        final Ignite ignite = ignite(0);
+
+        final IgniteCache<Object, Object> cache = ignite.getOrCreateCache(new CacheConfiguration<>("cache")
+            .setCacheMode(mode)
+            .setAtomicityMode(TRANSACTIONAL));
+
+        try {
+            Thread[] threads = new Thread[256];
+
+            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                // Lock the key.
+                final String key = "key";
+
+                cache.get(key);
+
+                // Simulate high contention.
+                for (int i = 0; i < threads.length; i++) {
+                    threads[i] = new Thread() {
+                        @Override public void run() {
+                            cache.put(key, 1);
+                        }
+                    };
+
+                    threads[i].start();
+                }
+
+                U.sleep(2_000);
+
+                cache.put(key, 1);
+
+                tx.commit();
+            }
+
+            System.out.println("Waiting for threads to finish...");
+
+            for (Thread thread : threads)
+                thread.join();
+        }
+        finally {
+            ignite.destroyCache("cache");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7b6ad36/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index 34b12a9..3eb0b13 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.CacheNearReaderUpdateTest;
 import org.apache.ignite.internal.processors.cache.CacheSerializableTransactionsTest;
+import org.apache.ignite.internal.processors.cache.IgniteCachePutStackOverflowSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheStoreCollectionTest;
 import org.apache.ignite.internal.processors.cache.store.IgniteCacheWriteBehindNoUpdateSelfTest;
 
@@ -38,6 +39,7 @@ public class IgniteCacheTestSuite5 extends TestSuite {
         suite.addTestSuite(CacheNearReaderUpdateTest.class);
         suite.addTestSuite(IgniteCacheStoreCollectionTest.class);
         suite.addTestSuite(IgniteCacheWriteBehindNoUpdateSelfTest.class);
+        suite.addTestSuite(IgniteCachePutStackOverflowSelfTest.class);
 
         return suite;
     }