You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2019/02/14 17:50:07 UTC

[ignite] branch master updated: IGNITE-1126 Fix of JVM Crashes on TeamCity, correctly cheking of stop in exchange future; added assertion for stopping of Page Memory. - Fixes #6092.

This is an automated email from the ASF dual-hosted git repository.

dpavlov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a77531  IGNITE-1126 Fix of JVM Crashes on TeamCity, correctly cheking of stop in exchange future; added assertion for stopping of Page Memory. - Fixes #6092.
3a77531 is described below

commit 3a775313747211869cba9bb828cde87b8fb7c75f
Author: EdShangGG <es...@gridgain.com>
AuthorDate: Thu Feb 14 20:49:28 2019 +0300

    IGNITE-1126 Fix of JVM Crashes on TeamCity, correctly cheking of stop in exchange future; added assertion for stopping of Page Memory. - Fixes #6092.
    
    Signed-off-by: Dmitriy Pavlov <dp...@apache.org>
---
 .../pagemem/impl/PageMemoryNoStoreImpl.java        | 29 ++++++++
 .../cache/CacheAffinitySharedManager.java          |  7 +-
 .../cache/GridCachePartitionExchangeManager.java   |  3 +-
 .../dht/GridPartitionedSingleGetFuture.java        |  4 +-
 .../preloader/GridDhtPartitionsExchangeFuture.java | 84 +++++++++++++---------
 .../cache/persistence/pagemem/PageMemoryImpl.java  | 40 +++++++++++
 6 files changed, 129 insertions(+), 38 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
index cb22446..12db21c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
@@ -165,6 +165,11 @@ public class PageMemoryNoStoreImpl implements PageMemory {
     private final GridCacheSharedContext<?, ?> ctx;
 
     /**
+     * Marker that stop was invoked and memory is not supposed for any usage.
+     */
+    private volatile boolean stopped;
+
+    /**
      * @param log Logger.
      * @param directMemoryProvider Memory allocator to use.
      * @param sharedCtx Cache shared context.
@@ -203,6 +208,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteException {
+        stopped = false;
+
         long startSize = dataRegionCfg.getInitialSize();
         long maxSize = dataRegionCfg.getMaxSize();
 
@@ -243,6 +250,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
         if (log.isDebugEnabled())
             log.debug("Stopping page memory.");
 
+        stopped = true;
+
         directMemoryProvider.shutdown(deallocate);
 
         if (directMemoryProvider instanceof Closeable) {
@@ -262,6 +271,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
 
     /** {@inheritDoc} */
     @Override public long allocatePage(int grpId, int partId, byte flags) {
+        assert !stopped;
+
         long relPtr = borrowFreePage();
         long absPtr = 0;
 
@@ -324,6 +335,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
 
     /** {@inheritDoc} */
     @Override public boolean freePage(int cacheId, long pageId) {
+        assert !stopped;
+
         releaseFreePage(pageId);
 
         return true;
@@ -448,6 +461,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
 
     /** {@inheritDoc} */
     @Override public long acquirePage(int cacheId, long pageId, IoStatisticsHolder statHolder) {
+        assert !stopped;
+
         int pageIdx = PageIdUtils.pageIndex(pageId);
 
         Segment seg = segment(pageIdx);
@@ -461,6 +476,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
 
     /** {@inheritDoc} */
     @Override public void releasePage(int cacheId, long pageId, long page) {
+        assert !stopped;
+
         if (trackAcquiredPages) {
             Segment seg = segment(PageIdUtils.pageIndex(pageId));
 
@@ -470,6 +487,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
 
     /** {@inheritDoc} */
     @Override public long readLock(int cacheId, long pageId, long page) {
+        assert !stopped;
+
         if (rwLock.readLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId)))
             return page + PAGE_OVERHEAD;
 
@@ -478,6 +497,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
 
     /** {@inheritDoc} */
     @Override public long readLockForce(int cacheId, long pageId, long page) {
+        assert !stopped;
+
         if (rwLock.readLock(page + LOCK_OFFSET, -1))
             return page + PAGE_OVERHEAD;
 
@@ -486,11 +507,15 @@ public class PageMemoryNoStoreImpl implements PageMemory {
 
     /** {@inheritDoc} */
     @Override public void readUnlock(int cacheId, long pageId, long page) {
+        assert !stopped;
+
         rwLock.readUnlock(page + LOCK_OFFSET);
     }
 
     /** {@inheritDoc} */
     @Override public long writeLock(int cacheId, long pageId, long page) {
+        assert !stopped;
+
         if (rwLock.writeLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId)))
             return page + PAGE_OVERHEAD;
 
@@ -499,6 +524,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
 
     /** {@inheritDoc} */
     @Override public long tryWriteLock(int cacheId, long pageId, long page) {
+        assert !stopped;
+
         if (rwLock.tryWriteLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId)))
             return page + PAGE_OVERHEAD;
 
@@ -513,6 +540,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
         Boolean walPlc,
         boolean dirtyFlag
     ) {
+        assert !stopped;
+
         long actualId = PageIO.getPageId(page + PAGE_OVERHEAD);
 
         rwLock.writeUnlock(page + LOCK_OFFSET, PageIdUtils.tag(actualId));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index ed0b7be..64524d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1545,7 +1545,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 assert grp != null;
 
                 if (affReq != null && affReq.contains(aff.groupId())) {
-                    assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
+                    assert AffinityTopologyVersion.NONE.equals(aff.lastVersion()) : aff.lastVersion();
 
                     CacheGroupAffinityMessage affMsg = receivedAff.get(aff.groupId());
 
@@ -1553,7 +1553,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                     List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, evts.discoveryCache());
 
-                    assert resTopVer.equals(evts.topologyVersion());
+                    assert resTopVer.equals(evts.topologyVersion()) : "resTopVer=" + resTopVer +
+                        ", evts.topVer=" + evts.topologyVersion();
 
                     List<List<ClusterNode>> idealAssign =
                         affMsg.createIdealAssignments(nodesByOrder, evts.discoveryCache());
@@ -1561,7 +1562,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     if (idealAssign != null)
                         aff.idealAssignment(idealAssign);
                     else {
-                        assert !aff.centralizedAffinityFunction();
+                        assert !aff.centralizedAffinityFunction() : aff;
 
                         // Calculate ideal assignments.
                         aff.calculate(evts.topologyVersion(), evts, evts.discoveryCache());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d8ec478..553af8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNeedReconnectException;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
@@ -761,7 +762,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         stopErr = cctx.kernalContext().clientDisconnected() ?
             new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(),
                 "Client node disconnected: " + cctx.igniteInstanceName()) :
-            new IgniteCheckedException("Node is stopping: " + cctx.igniteInstanceName());
+            new NodeStoppingException("Node is stopping: " + cctx.igniteInstanceName());
 
         // Stop exchange worker
         U.cancel(exchWorker);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 56b80f6..99155dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -31,6 +31,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteDiagnosticAware;
 import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -893,7 +894,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
             if (trackable)
                 cctx.mvcc().removeFuture(futId);
 
-            cctx.dht().sendTtlUpdateRequest(expiryPlc);
+            if (!(err instanceof NodeStoppingException))
+                cctx.dht().sendTtlUpdateRequest(expiryPlc);
 
             return true;
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 343ad3f..11a9ffe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -196,7 +196,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** Cache context. */
     private final GridCacheSharedContext<?, ?> cctx;
 
-    /** Busy lock to prevent activities from accessing exchanger while it's stopping. */
+    /**
+     * Busy lock to prevent activities from accessing exchanger while it's stopping. Stopping uses write lock, so every
+     * {@link #enterBusy()} will be failed as false. But regular operation uses read lock acquired multiple times.
+     */
     private ReadWriteLock busyLock;
 
     /** */
@@ -653,7 +656,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @return {@code true} if entered to busy state.
+     * @return {@code true} if entered to busy state. {@code false} for stop node.
      */
     private boolean enterBusy() {
         if (busyLock.readLock().tryLock())
@@ -3132,7 +3135,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         try {
             // Reserve at least 2 threads for system operations.
-            U.doInParallel(
+            doInParallel(
                 U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2),
                 cctx.kernalContext().getSystemExecutorService(),
                 cctx.cache().cacheGroups(),
@@ -3312,6 +3315,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @param sndResNodes Additional nodes to send finish message to.
      */
     private void finishExchangeOnCoordinator(@Nullable Collection<ClusterNode> sndResNodes) {
+        if (isDone() || !enterBusy())
+            return;
+
         try {
             if (!F.isEmpty(exchangeGlobalExceptions) && dynamicCacheStartExchange() && isRollbackSupported()) {
                 sendExchangeFailureMessage();
@@ -3352,7 +3358,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, true);
 
 
-                U.doInParallel(
+                doInParallel(
                     parallelismLvl,
                     cctx.kernalContext().getSystemExecutorService(),
                     cctx.affinity().cacheGroups().values(),
@@ -3375,37 +3381,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = new ConcurrentHashMap<>(cctx.cache().cacheGroups().size());
 
-            U.doInParallel(
+            doInParallel(
                 parallelismLvl,
                 cctx.kernalContext().getSystemExecutorService(),
-                msgs.entrySet(),
-                entry -> {
-                    GridDhtPartitionsSingleMessage msg = entry.getValue();
-
-                    for (Map.Entry<Integer, GridDhtPartitionMap> e : msg.partitions().entrySet()) {
-                        Integer grpId = e.getKey();
-
-                        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
-
-                        GridDhtPartitionTopology top = grp != null
-                            ? grp.topology()
-                            : cctx.exchange().clientTopology(grpId, events().discoveryCache());
-
-                        CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId, top.partitions());
-
-                        if (cntrs != null)
-                            top.collectUpdateCounters(cntrs);
-                    }
-
-                    Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
-
-                    if (affReq != null)
-                        CacheGroupAffinityMessage.createAffinityMessages(
-                            cctx,
-                            resTopVer,
-                            affReq,
-                            joinedNodeAff
-                        );
+                msgs.values(),
+                msg -> {
+                    processSingleMessageOnCrdFinish(msg, joinedNodeAff);
 
                     return null;
                 }
@@ -3611,6 +3592,43 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             else
                 onDone(e);
         }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param msg Single message to process.
+     * @param messageAccumulator Message to store message which need to be sent after.
+     */
+    private void processSingleMessageOnCrdFinish(
+        GridDhtPartitionsSingleMessage msg,
+        Map<Integer, CacheGroupAffinityMessage> messageAccumulator
+    ) {
+        for (Map.Entry<Integer, GridDhtPartitionMap> e : msg.partitions().entrySet()) {
+            Integer grpId = e.getKey();
+
+            CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+
+            GridDhtPartitionTopology top = grp != null
+                ? grp.topology()
+                : cctx.exchange().clientTopology(grpId, events().discoveryCache());
+
+            CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId, top.partitions());
+
+            if (cntrs != null)
+                top.collectUpdateCounters(cntrs);
+        }
+
+        Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+
+        if (affReq != null)
+            CacheGroupAffinityMessage.createAffinityMessages(
+                cctx,
+                exchCtx.events().topologyVersion(),
+                affReq,
+                messageAccumulator
+            );
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index 4b236d8..0f4ca7d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -281,6 +281,11 @@ public class PageMemoryImpl implements PageMemoryEx {
     private DataRegionMetricsImpl memMetrics;
 
     /**
+     * Marker that stop was invoked and memory is not supposed for any usage.
+     */
+    private volatile boolean stopped;
+
+    /**
      * @param directMemoryProvider Memory allocator to use.
      * @param sizes segments sizes, last is checkpoint pool size.
      * @param ctx Cache shared context.
@@ -342,6 +347,8 @@ public class PageMemoryImpl implements PageMemoryEx {
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteException {
+        stopped = false;
+
         directMemoryProvider.initialize(sizes);
 
         List<DirectMemoryRegion> regions = new ArrayList<>(sizes.length);
@@ -416,11 +423,15 @@ public class PageMemoryImpl implements PageMemoryEx {
                 seg.close();
         }
 
+        stopped = true;
+
         directMemoryProvider.shutdown(deallocate);
     }
 
     /** {@inheritDoc} */
     @Override public void releasePage(int grpId, long pageId, long page) {
+        assert !stopped;
+
         Segment seg = segment(grpId, pageId);
 
         seg.readLock().lock();
@@ -435,43 +446,59 @@ public class PageMemoryImpl implements PageMemoryEx {
 
     /** {@inheritDoc} */
     @Override public long readLock(int grpId, long pageId, long page) {
+        assert !stopped;
+
         return readLock(page, pageId, false);
     }
 
     /** {@inheritDoc} */
     @Override public void readUnlock(int grpId, long pageId, long page) {
+        assert !stopped;
+
         readUnlockPage(page);
     }
 
     /** {@inheritDoc} */
     @Override public long writeLock(int grpId, long pageId, long page) {
+        assert !stopped;
+
         return writeLock(grpId, pageId, page, false);
     }
 
     /** {@inheritDoc} */
     @Override public long writeLock(int grpId, long pageId, long page, boolean restore) {
+        assert !stopped;
+
         return writeLockPage(page, new FullPageId(pageId, grpId), !restore);
     }
 
     /** {@inheritDoc} */
     @Override public long tryWriteLock(int grpId, long pageId, long page) {
+        assert !stopped;
+
         return tryWriteLockPage(page, new FullPageId(pageId, grpId), true);
     }
 
     /** {@inheritDoc} */
     @Override public void writeUnlock(int grpId, long pageId, long page, Boolean walPlc,
         boolean dirtyFlag) {
+        assert !stopped;
+
         writeUnlock(grpId, pageId, page, walPlc, dirtyFlag, false);
     }
 
     /** {@inheritDoc} */
     @Override public void writeUnlock(int grpId, long pageId, long page, Boolean walPlc,
         boolean dirtyFlag, boolean restore) {
+        assert !stopped;
+
         writeUnlockPage(page, new FullPageId(pageId, grpId), walPlc, dirtyFlag, restore);
     }
 
     /** {@inheritDoc} */
     @Override public boolean isDirty(int grpId, long pageId, long page) {
+        assert !stopped;
+
         return isDirty(page);
     }
 
@@ -481,6 +508,7 @@ public class PageMemoryImpl implements PageMemoryEx {
             flags == PageIdAllocator.FLAG_IDX && partId == PageIdAllocator.INDEX_PARTITION :
             "flags = " + flags + ", partId = " + partId;
 
+        assert !stopped;
         assert stateChecker.checkpointLockIsHeldByThread();
 
         if (isThrottlingEnabled())
@@ -636,11 +664,15 @@ public class PageMemoryImpl implements PageMemoryEx {
 
     /** {@inheritDoc} */
     @Override public long metaPageId(int grpId) throws IgniteCheckedException {
+        assert !stopped;
+
         return storeMgr.metaPageId(grpId);
     }
 
     /** {@inheritDoc} */
     @Override public long partitionMetaPageId(int grpId, int partId) throws IgniteCheckedException {
+        assert !stopped;
+
         return PageIdUtils.pageId(partId, PageIdAllocator.FLAG_DATA, 0);
     }
 
@@ -652,12 +684,16 @@ public class PageMemoryImpl implements PageMemoryEx {
     /** {@inheritDoc} */
     @Override public long acquirePage(int grpId, long pageId,
         IoStatisticsHolder statHolder) throws IgniteCheckedException {
+        assert !stopped;
+
         return acquirePage(grpId, pageId, statHolder, false);
     }
 
     /** {@inheritDoc} */
     @Override public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder,
         boolean restore) throws IgniteCheckedException {
+        assert !stopped;
+
         FullPageId fullId = new FullPageId(pageId, grpId);
 
         int partId = PageIdUtils.partId(pageId);
@@ -1402,6 +1438,8 @@ public class PageMemoryImpl implements PageMemoryEx {
 
     /** {@inheritDoc} */
     @Override  public long readLock(long absPtr, long pageId, boolean force, boolean touch) {
+        assert !stopped;
+
         int tag = force ? -1 : PageIdUtils.tag(pageId);
 
         boolean locked = rwLock.readLock(absPtr + PAGE_LOCK_OFFSET, tag);
@@ -1419,6 +1457,8 @@ public class PageMemoryImpl implements PageMemoryEx {
 
     /** {@inheritDoc} */
     @Override public long readLockForce(int grpId, long pageId, long page) {
+        assert !stopped;
+
         return readLock(page, pageId, true);
     }