You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2019/02/14 17:50:07 UTC
[ignite] branch master updated: IGNITE-1126 Fix of JVM Crashes on
TeamCity, correctly cheking of stop in exchange future;
added assertion for stopping of Page Memory. - Fixes #6092.
This is an automated email from the ASF dual-hosted git repository.
dpavlov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3a77531 IGNITE-1126 Fix of JVM Crashes on TeamCity, correctly cheking of stop in exchange future; added assertion for stopping of Page Memory. - Fixes #6092.
3a77531 is described below
commit 3a775313747211869cba9bb828cde87b8fb7c75f
Author: EdShangGG <es...@gridgain.com>
AuthorDate: Thu Feb 14 20:49:28 2019 +0300
IGNITE-1126 Fix of JVM Crashes on TeamCity, correctly cheking of stop in exchange future; added assertion for stopping of Page Memory. - Fixes #6092.
Signed-off-by: Dmitriy Pavlov <dp...@apache.org>
---
.../pagemem/impl/PageMemoryNoStoreImpl.java | 29 ++++++++
.../cache/CacheAffinitySharedManager.java | 7 +-
.../cache/GridCachePartitionExchangeManager.java | 3 +-
.../dht/GridPartitionedSingleGetFuture.java | 4 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 84 +++++++++++++---------
.../cache/persistence/pagemem/PageMemoryImpl.java | 40 +++++++++++
6 files changed, 129 insertions(+), 38 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
index cb22446..12db21c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
@@ -165,6 +165,11 @@ public class PageMemoryNoStoreImpl implements PageMemory {
private final GridCacheSharedContext<?, ?> ctx;
/**
+ * Marker that stop was invoked and memory is not supposed for any usage.
+ */
+ private volatile boolean stopped;
+
+ /**
* @param log Logger.
* @param directMemoryProvider Memory allocator to use.
* @param sharedCtx Cache shared context.
@@ -203,6 +208,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
/** {@inheritDoc} */
@Override public void start() throws IgniteException {
+ stopped = false;
+
long startSize = dataRegionCfg.getInitialSize();
long maxSize = dataRegionCfg.getMaxSize();
@@ -243,6 +250,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
if (log.isDebugEnabled())
log.debug("Stopping page memory.");
+ stopped = true;
+
directMemoryProvider.shutdown(deallocate);
if (directMemoryProvider instanceof Closeable) {
@@ -262,6 +271,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
/** {@inheritDoc} */
@Override public long allocatePage(int grpId, int partId, byte flags) {
+ assert !stopped;
+
long relPtr = borrowFreePage();
long absPtr = 0;
@@ -324,6 +335,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
/** {@inheritDoc} */
@Override public boolean freePage(int cacheId, long pageId) {
+ assert !stopped;
+
releaseFreePage(pageId);
return true;
@@ -448,6 +461,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
/** {@inheritDoc} */
@Override public long acquirePage(int cacheId, long pageId, IoStatisticsHolder statHolder) {
+ assert !stopped;
+
int pageIdx = PageIdUtils.pageIndex(pageId);
Segment seg = segment(pageIdx);
@@ -461,6 +476,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
/** {@inheritDoc} */
@Override public void releasePage(int cacheId, long pageId, long page) {
+ assert !stopped;
+
if (trackAcquiredPages) {
Segment seg = segment(PageIdUtils.pageIndex(pageId));
@@ -470,6 +487,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
/** {@inheritDoc} */
@Override public long readLock(int cacheId, long pageId, long page) {
+ assert !stopped;
+
if (rwLock.readLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId)))
return page + PAGE_OVERHEAD;
@@ -478,6 +497,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
/** {@inheritDoc} */
@Override public long readLockForce(int cacheId, long pageId, long page) {
+ assert !stopped;
+
if (rwLock.readLock(page + LOCK_OFFSET, -1))
return page + PAGE_OVERHEAD;
@@ -486,11 +507,15 @@ public class PageMemoryNoStoreImpl implements PageMemory {
/** {@inheritDoc} */
@Override public void readUnlock(int cacheId, long pageId, long page) {
+ assert !stopped;
+
rwLock.readUnlock(page + LOCK_OFFSET);
}
/** {@inheritDoc} */
@Override public long writeLock(int cacheId, long pageId, long page) {
+ assert !stopped;
+
if (rwLock.writeLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId)))
return page + PAGE_OVERHEAD;
@@ -499,6 +524,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
/** {@inheritDoc} */
@Override public long tryWriteLock(int cacheId, long pageId, long page) {
+ assert !stopped;
+
if (rwLock.tryWriteLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId)))
return page + PAGE_OVERHEAD;
@@ -513,6 +540,8 @@ public class PageMemoryNoStoreImpl implements PageMemory {
Boolean walPlc,
boolean dirtyFlag
) {
+ assert !stopped;
+
long actualId = PageIO.getPageId(page + PAGE_OVERHEAD);
rwLock.writeUnlock(page + LOCK_OFFSET, PageIdUtils.tag(actualId));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index ed0b7be..64524d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1545,7 +1545,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert grp != null;
if (affReq != null && affReq.contains(aff.groupId())) {
- assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
+ assert AffinityTopologyVersion.NONE.equals(aff.lastVersion()) : aff.lastVersion();
CacheGroupAffinityMessage affMsg = receivedAff.get(aff.groupId());
@@ -1553,7 +1553,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, evts.discoveryCache());
- assert resTopVer.equals(evts.topologyVersion());
+ assert resTopVer.equals(evts.topologyVersion()) : "resTopVer=" + resTopVer +
+ ", evts.topVer=" + evts.topologyVersion();
List<List<ClusterNode>> idealAssign =
affMsg.createIdealAssignments(nodesByOrder, evts.discoveryCache());
@@ -1561,7 +1562,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (idealAssign != null)
aff.idealAssignment(idealAssign);
else {
- assert !aff.centralizedAffinityFunction();
+ assert !aff.centralizedAffinityFunction() : aff;
// Calculate ideal assignments.
aff.calculate(evts.topologyVersion(), evts, evts.discoveryCache());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d8ec478..553af8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNeedReconnectException;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
@@ -761,7 +762,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
stopErr = cctx.kernalContext().clientDisconnected() ?
new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(),
"Client node disconnected: " + cctx.igniteInstanceName()) :
- new IgniteCheckedException("Node is stopping: " + cctx.igniteInstanceName());
+ new NodeStoppingException("Node is stopping: " + cctx.igniteInstanceName());
// Stop exchange worker
U.cancel(exchWorker);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 56b80f6..99155dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -31,6 +31,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteDiagnosticAware;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -893,7 +894,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
if (trackable)
cctx.mvcc().removeFuture(futId);
- cctx.dht().sendTtlUpdateRequest(expiryPlc);
+ if (!(err instanceof NodeStoppingException))
+ cctx.dht().sendTtlUpdateRequest(expiryPlc);
return true;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 343ad3f..11a9ffe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -196,7 +196,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** Cache context. */
private final GridCacheSharedContext<?, ?> cctx;
- /** Busy lock to prevent activities from accessing exchanger while it's stopping. */
+ /**
+ * Busy lock to prevent activities from accessing exchanger while it's stopping. Stopping uses write lock, so every
+ * {@link #enterBusy()} will be failed as false. But regular operation uses read lock acquired multiple times.
+ */
private ReadWriteLock busyLock;
/** */
@@ -653,7 +656,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * @return {@code true} if entered to busy state.
+ * @return {@code true} if entered to busy state. {@code false} for stop node.
*/
private boolean enterBusy() {
if (busyLock.readLock().tryLock())
@@ -3132,7 +3135,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
try {
// Reserve at least 2 threads for system operations.
- U.doInParallel(
+ doInParallel(
U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2),
cctx.kernalContext().getSystemExecutorService(),
cctx.cache().cacheGroups(),
@@ -3312,6 +3315,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param sndResNodes Additional nodes to send finish message to.
*/
private void finishExchangeOnCoordinator(@Nullable Collection<ClusterNode> sndResNodes) {
+ if (isDone() || !enterBusy())
+ return;
+
try {
if (!F.isEmpty(exchangeGlobalExceptions) && dynamicCacheStartExchange() && isRollbackSupported()) {
sendExchangeFailureMessage();
@@ -3352,7 +3358,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, true);
- U.doInParallel(
+ doInParallel(
parallelismLvl,
cctx.kernalContext().getSystemExecutorService(),
cctx.affinity().cacheGroups().values(),
@@ -3375,37 +3381,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = new ConcurrentHashMap<>(cctx.cache().cacheGroups().size());
- U.doInParallel(
+ doInParallel(
parallelismLvl,
cctx.kernalContext().getSystemExecutorService(),
- msgs.entrySet(),
- entry -> {
- GridDhtPartitionsSingleMessage msg = entry.getValue();
-
- for (Map.Entry<Integer, GridDhtPartitionMap> e : msg.partitions().entrySet()) {
- Integer grpId = e.getKey();
-
- CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
-
- GridDhtPartitionTopology top = grp != null
- ? grp.topology()
- : cctx.exchange().clientTopology(grpId, events().discoveryCache());
-
- CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId, top.partitions());
-
- if (cntrs != null)
- top.collectUpdateCounters(cntrs);
- }
-
- Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
-
- if (affReq != null)
- CacheGroupAffinityMessage.createAffinityMessages(
- cctx,
- resTopVer,
- affReq,
- joinedNodeAff
- );
+ msgs.values(),
+ msg -> {
+ processSingleMessageOnCrdFinish(msg, joinedNodeAff);
return null;
}
@@ -3611,6 +3592,43 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
else
onDone(e);
}
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /**
+ * @param msg Single message to process.
+ * @param messageAccumulator Message to store message which need to be sent after.
+ */
+ private void processSingleMessageOnCrdFinish(
+ GridDhtPartitionsSingleMessage msg,
+ Map<Integer, CacheGroupAffinityMessage> messageAccumulator
+ ) {
+ for (Map.Entry<Integer, GridDhtPartitionMap> e : msg.partitions().entrySet()) {
+ Integer grpId = e.getKey();
+
+ CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+
+ GridDhtPartitionTopology top = grp != null
+ ? grp.topology()
+ : cctx.exchange().clientTopology(grpId, events().discoveryCache());
+
+ CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId, top.partitions());
+
+ if (cntrs != null)
+ top.collectUpdateCounters(cntrs);
+ }
+
+ Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+
+ if (affReq != null)
+ CacheGroupAffinityMessage.createAffinityMessages(
+ cctx,
+ exchCtx.events().topologyVersion(),
+ affReq,
+ messageAccumulator
+ );
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index 4b236d8..0f4ca7d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -281,6 +281,11 @@ public class PageMemoryImpl implements PageMemoryEx {
private DataRegionMetricsImpl memMetrics;
/**
+ * Marker that stop was invoked and memory is not supposed for any usage.
+ */
+ private volatile boolean stopped;
+
+ /**
* @param directMemoryProvider Memory allocator to use.
* @param sizes segments sizes, last is checkpoint pool size.
* @param ctx Cache shared context.
@@ -342,6 +347,8 @@ public class PageMemoryImpl implements PageMemoryEx {
/** {@inheritDoc} */
@Override public void start() throws IgniteException {
+ stopped = false;
+
directMemoryProvider.initialize(sizes);
List<DirectMemoryRegion> regions = new ArrayList<>(sizes.length);
@@ -416,11 +423,15 @@ public class PageMemoryImpl implements PageMemoryEx {
seg.close();
}
+ stopped = true;
+
directMemoryProvider.shutdown(deallocate);
}
/** {@inheritDoc} */
@Override public void releasePage(int grpId, long pageId, long page) {
+ assert !stopped;
+
Segment seg = segment(grpId, pageId);
seg.readLock().lock();
@@ -435,43 +446,59 @@ public class PageMemoryImpl implements PageMemoryEx {
/** {@inheritDoc} */
@Override public long readLock(int grpId, long pageId, long page) {
+ assert !stopped;
+
return readLock(page, pageId, false);
}
/** {@inheritDoc} */
@Override public void readUnlock(int grpId, long pageId, long page) {
+ assert !stopped;
+
readUnlockPage(page);
}
/** {@inheritDoc} */
@Override public long writeLock(int grpId, long pageId, long page) {
+ assert !stopped;
+
return writeLock(grpId, pageId, page, false);
}
/** {@inheritDoc} */
@Override public long writeLock(int grpId, long pageId, long page, boolean restore) {
+ assert !stopped;
+
return writeLockPage(page, new FullPageId(pageId, grpId), !restore);
}
/** {@inheritDoc} */
@Override public long tryWriteLock(int grpId, long pageId, long page) {
+ assert !stopped;
+
return tryWriteLockPage(page, new FullPageId(pageId, grpId), true);
}
/** {@inheritDoc} */
@Override public void writeUnlock(int grpId, long pageId, long page, Boolean walPlc,
boolean dirtyFlag) {
+ assert !stopped;
+
writeUnlock(grpId, pageId, page, walPlc, dirtyFlag, false);
}
/** {@inheritDoc} */
@Override public void writeUnlock(int grpId, long pageId, long page, Boolean walPlc,
boolean dirtyFlag, boolean restore) {
+ assert !stopped;
+
writeUnlockPage(page, new FullPageId(pageId, grpId), walPlc, dirtyFlag, restore);
}
/** {@inheritDoc} */
@Override public boolean isDirty(int grpId, long pageId, long page) {
+ assert !stopped;
+
return isDirty(page);
}
@@ -481,6 +508,7 @@ public class PageMemoryImpl implements PageMemoryEx {
flags == PageIdAllocator.FLAG_IDX && partId == PageIdAllocator.INDEX_PARTITION :
"flags = " + flags + ", partId = " + partId;
+ assert !stopped;
assert stateChecker.checkpointLockIsHeldByThread();
if (isThrottlingEnabled())
@@ -636,11 +664,15 @@ public class PageMemoryImpl implements PageMemoryEx {
/** {@inheritDoc} */
@Override public long metaPageId(int grpId) throws IgniteCheckedException {
+ assert !stopped;
+
return storeMgr.metaPageId(grpId);
}
/** {@inheritDoc} */
@Override public long partitionMetaPageId(int grpId, int partId) throws IgniteCheckedException {
+ assert !stopped;
+
return PageIdUtils.pageId(partId, PageIdAllocator.FLAG_DATA, 0);
}
@@ -652,12 +684,16 @@ public class PageMemoryImpl implements PageMemoryEx {
/** {@inheritDoc} */
@Override public long acquirePage(int grpId, long pageId,
IoStatisticsHolder statHolder) throws IgniteCheckedException {
+ assert !stopped;
+
return acquirePage(grpId, pageId, statHolder, false);
}
/** {@inheritDoc} */
@Override public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder,
boolean restore) throws IgniteCheckedException {
+ assert !stopped;
+
FullPageId fullId = new FullPageId(pageId, grpId);
int partId = PageIdUtils.partId(pageId);
@@ -1402,6 +1438,8 @@ public class PageMemoryImpl implements PageMemoryEx {
/** {@inheritDoc} */
@Override public long readLock(long absPtr, long pageId, boolean force, boolean touch) {
+ assert !stopped;
+
int tag = force ? -1 : PageIdUtils.tag(pageId);
boolean locked = rwLock.readLock(absPtr + PAGE_LOCK_OFFSET, tag);
@@ -1419,6 +1457,8 @@ public class PageMemoryImpl implements PageMemoryEx {
/** {@inheritDoc} */
@Override public long readLockForce(int grpId, long pageId, long page) {
+ assert !stopped;
+
return readLock(page, pageId, true);
}