You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/10/23 09:05:31 UTC
ignite git commit: IGNITE-9911: MVCC: Fixed a hang during vacuum.
This closes #5035.
Repository: ignite
Updated Branches:
refs/heads/master 962d6a297 -> b1584a8f2
IGNITE-9911: MVCC: Fixed a hang during vacuum. This closes #5035.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1584a8f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1584a8f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1584a8f
Branch: refs/heads/master
Commit: b1584a8f2636d390b3f8cc754ce1195ccc797807
Parents: 962d6a2
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Tue Oct 23 12:05:23 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Oct 23 12:05:23 2018 +0300
----------------------------------------------------------------------
.../cache/mvcc/MvccProcessorImpl.java | 228 ++++++++-----------
.../cache/mvcc/CacheMvccAbstractTest.java | 21 +-
...cheMvccSelectForUpdateQueryAbstractTest.java | 11 +-
3 files changed, 115 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1584a8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index 9fcafb0..e58151f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -90,8 +90,8 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridCursor;
-import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -115,7 +115,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker.MVCC_TRACKER_ID_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA;
@@ -181,9 +180,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
*/
private final Object mux = new Object();
- /** For tests only. */
- private volatile Throwable vacuumError;
-
/** */
private final GridAtomicLong futIdCntr = new GridAtomicLong(0);
@@ -1136,8 +1132,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
crdVer == 0 && ctx.localNodeId().equals(crd0.nodeId()))
return new GridFinishedFuture<>(new VacuumMetrics());
- final GridCompoundIdentityFuture<VacuumMetrics> res =
- new GridCompoundIdentityFuture<>(new VacuumMetricsReducer());
+ final GridFutureAdapter<VacuumMetrics> res = new GridFutureAdapter<>();
MvccSnapshot snapshot;
@@ -1174,28 +1169,10 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
}
/**
- * For tests only.
- *
- * @return Vacuum error.
- */
- Throwable vacuumError() {
- return vacuumError;
- }
-
- /**
- * For tests only.
- *
- * @param e Vacuum error.
- */
- void vacuumError(Throwable e) {
- this.vacuumError = e;
- }
-
- /**
* @param res Result.
* @param snapshot Snapshot.
*/
- private void continueRunVacuum(GridCompoundIdentityFuture<VacuumMetrics> res, MvccSnapshot snapshot) {
+ private void continueRunVacuum(GridFutureAdapter<VacuumMetrics> res, MvccSnapshot snapshot) {
ackTxCommit(snapshot)
.listen(new IgniteInClosure<IgniteInternalFuture>() {
@Override public void apply(IgniteInternalFuture fut) {
@@ -1220,23 +1197,45 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
return;
}
+ GridCompoundIdentityFuture<VacuumMetrics> res0 =
+ new GridCompoundIdentityFuture<VacuumMetrics>(new VacuumMetricsReducer()) {
+ /** {@inheritDoc} */
+ @Override protected void logError(IgniteLogger log, String msg, Throwable e) {
+ // no-op
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void logDebug(IgniteLogger log, String msg) {
+ // no-op
+ }
+ };
+
for (CacheGroupContext grp : ctx.cache().cacheGroups()) {
if (grp.mvccEnabled()) {
- for (GridDhtLocalPartition part : grp.topology().localPartitions()) {
- VacuumTask task = new VacuumTask(snapshot, part);
+ grp.topology().readLock();
+
+ try {
+ for (GridDhtLocalPartition part : grp.topology().localPartitions()) {
+ VacuumTask task = new VacuumTask(snapshot, part);
- cleanupQueue.offer(task);
+ cleanupQueue.offer(task);
- res.add(task);
+ res0.add(task);
+ }
+ }
+ finally {
+ grp.topology().readUnlock();
}
}
}
- }
- res.listen(new CI1<IgniteInternalFuture<VacuumMetrics>>() {
- @Override public void apply(IgniteInternalFuture<VacuumMetrics> fut) {
+ res0.markInitialized();
+
+ res0.listen(future -> {
+ VacuumMetrics metrics = null; Throwable ex = null;
+
try {
- VacuumMetrics metrics = fut.get();
+ metrics = future.get();
if (U.assertionsEnabled()) {
MvccCoordinator crd = currentCoordinator();
@@ -1260,14 +1259,17 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
catch (NodeStoppingException ignored) {
if (log.isDebugEnabled())
log.debug("Cannot complete vacuum (node is stopping).");
+
+ metrics = new VacuumMetrics();
}
catch (Throwable e) {
- U.error(log, "Vacuum error.", e);
+ ex = new GridClosureException(e);
}
- }
- });
- res.markInitialized();
+ res.onDone(metrics, ex);
+ });
+ }
+
}
catch (Throwable e) {
completeWithException(res, e);
@@ -2138,13 +2140,13 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
}
}
catch (IgniteInterruptedCheckedException e) {
- throw e;
+ throw e; // Cancelled.
}
catch (Throwable e) {
- prc.vacuumError(e);
-
if (e instanceof Error)
throw (Error) e;
+
+ U.error(log, "Vacuum error.", e);
}
long delay = nextScheduledTime - U.currentTimeMillis();
@@ -2179,20 +2181,29 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
VacuumTask task = cleanupQueue.take();
try {
- if (task.part().state() != OWNING) {
- task.part().group().preloader().rebalanceFuture()
- .listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> future) {
- cleanupQueue.add(task);
- }
- });
+ switch (task.part().state()) {
+ case EVICTED:
+ case RENTING:
+ task.onDone(new VacuumMetrics());
- continue;
- }
+ break;
+ case MOVING:
+ task.part().group().preloader().rebalanceFuture().listen(f -> cleanupQueue.add(task));
- task.onDone(processPartition(task));
+ break;
+ case OWNING:
+ task.onDone(processPartition(task));
+
+ break;
+ case LOST:
+ task.onDone(new IgniteCheckedException("Partition is lost."));
+
+ break;
+ }
}
catch (IgniteInterruptedCheckedException e) {
+ task.onDone(e);
+
throw e; // Cancelled.
}
catch (Throwable e) {
@@ -2217,7 +2228,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
VacuumMetrics metrics = new VacuumMetrics();
- if (!canRunVacuum(part, null) || !part.reserve())
+ if (!part.reserve())
return metrics;
int curCacheId = CU.UNDEFINED_CACHE_ID;
@@ -2289,63 +2300,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
return metrics;
}
- catch (Exception e) {
- if (canRunVacuum(part, curCacheId))
- throw e; // Unexpected error.
-
- U.warn(log, "Error occurred during the vacuum. Skip vacuuming for the current partition. " +
- "[cacheId=" + curCacheId + ", part=" + part + ", err=" + e.getMessage() + ']', e);
-
- return new VacuumMetrics();
- }
finally {
part.release();
}
}
- /**
- * @param part Partition.
- * @param cacheId Cache id.
- * @return {@code True} if we can vacuum given partition.
- */
- private boolean canRunVacuum(GridDhtLocalPartition part, Integer cacheId) {
- if (part == null || part.state() != OWNING)
- return false;
-
- CacheGroupContext grp = part.group();
-
- assert grp != null;
-
- List<GridCacheContext> caches = grp.caches();
-
- if (F.isEmpty(caches))
- return false;
-
- if (grp.shared().kernalContext().isStopping())
- return false;
-
- if (cacheId == null && grp.sharedGroup())
- return true; // Cache context is unknown, but we can try to run vacuum.
-
- GridCacheContext ctx0;
-
- if (grp.sharedGroup()) {
- assert cacheId != null && cacheId != CU.UNDEFINED_CACHE_ID;
-
- if (!grp.cacheIds().contains(cacheId))
- return false;
-
- ctx0 = grp.shared().cacheContext(cacheId);
- }
- else
- ctx0 = caches.get(0);
-
- if (ctx0 == null)
- return false;
-
- return !grp.shared().closed(ctx0);
- }
-
/** */
@SuppressWarnings("unchecked")
@NotNull private Object addRest(@Nullable Object rest, MvccDataRow row) {
@@ -2417,47 +2376,54 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
Object rest, GridCacheContext cctx, VacuumMetrics metrics) throws IgniteCheckedException {
assert key != null && cctx != null && (!F.isEmpty(cleanupRows) || rest != null);
- long cleanupStartNanoTime = System.nanoTime();
+ cctx.gate().enter();
- GridCacheEntryEx entry = cctx.cache().entryEx(key);
+ try {
+ long cleanupStartNanoTime = System.nanoTime();
- while (true) {
- entry.lockEntry();
+ GridCacheEntryEx entry = cctx.cache().entryEx(key);
- if (!entry.obsolete())
- break;
+ while (true) {
+ entry.lockEntry();
- entry.unlockEntry();
+ if (!entry.obsolete())
+ break;
- entry = cctx.cache().entryEx(key);
- }
+ entry.unlockEntry();
- cctx.shared().database().checkpointReadLock();
+ entry = cctx.cache().entryEx(key);
+ }
- int cleaned = 0;
+ cctx.shared().database().checkpointReadLock();
- try {
- if (cleanupRows != null)
- cleaned = part.dataStore().cleanup(cctx, cleanupRows);
+ int cleaned = 0;
+
+ try {
+ if (cleanupRows != null)
+ cleaned = part.dataStore().cleanup(cctx, cleanupRows);
- if (rest != null) {
- if (rest.getClass() == ArrayList.class) {
- for (MvccDataRow row : ((List<MvccDataRow>)rest)) {
- part.dataStore().updateTxState(cctx, row);
+ if (rest != null) {
+ if (rest.getClass() == ArrayList.class) {
+ for (MvccDataRow row : ((List<MvccDataRow>)rest)) {
+ part.dataStore().updateTxState(cctx, row);
+ }
}
+ else
+ part.dataStore().updateTxState(cctx, (MvccDataRow)rest);
}
- else
- part.dataStore().updateTxState(cctx, (MvccDataRow)rest);
}
- }
- finally {
- cctx.shared().database().checkpointReadUnlock();
+ finally {
+ cctx.shared().database().checkpointReadUnlock();
- entry.unlockEntry();
- cctx.evicts().touch(entry, AffinityTopologyVersion.NONE);
+ entry.unlockEntry();
+ cctx.evicts().touch(entry, AffinityTopologyVersion.NONE);
- metrics.addCleanupNanoTime(System.nanoTime() - cleanupStartNanoTime);
- metrics.addCleanupRowsCnt(cleaned);
+ metrics.addCleanupNanoTime(System.nanoTime() - cleanupStartNanoTime);
+ metrics.addCleanupRowsCnt(cleaned);
+ }
+ }
+ finally {
+ cctx.gate().leave();
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1584a8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index ec6b78a..57f714a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -1532,14 +1532,21 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
protected void verifyOldVersionsCleaned() throws Exception {
- runVacuumSync();
+ boolean retry;
- awaitPartitionMapExchange();
+ try {
+ runVacuumSync();
- // Check versions.
- boolean cleaned = checkOldVersions(false);
+ // Check versions.
+ retry = !checkOldVersions(false);
+ }
+ catch (Exception e) {
+ U.warn(log(), "Failed to perform vacuum, will retry.", e);
- if (!cleaned) { // Retry on a stable topology with a newer snapshot.
+ retry = true;
+ }
+
+ if (retry) { // Retry on a stable topology with a newer snapshot.
awaitPartitionMapExchange();
runVacuumSync();
@@ -1605,10 +1612,6 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
assert GridTestUtils.getFieldValue(crd, "txLog") != null;
- Throwable vacuumError = crd.vacuumError();
-
- assertNull(X.getFullStackTrace(vacuumError), vacuumError);
-
fut.add(crd.runVacuum());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1584a8f/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
index c6584e9..16b45ab 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.util.typedef.X;
@@ -57,7 +58,7 @@ public abstract class CacheMvccSelectForUpdateQueryAbstractTest extends CacheMvc
disableScheduledVacuum = getName().equals("testSelectForUpdateAfterAbortedTx");
- startGrids(3);
+ IgniteEx grid = startGrid(0);
CacheConfiguration seg = new CacheConfiguration("segmented*");
@@ -66,11 +67,9 @@ public abstract class CacheMvccSelectForUpdateQueryAbstractTest extends CacheMvc
if (seg.getCacheMode() == PARTITIONED)
seg.setQueryParallelism(4);
- grid(0).addCacheConfiguration(seg);
+ grid.addCacheConfiguration(seg);
- Thread.sleep(1000L);
-
- try (Connection c = connect(grid(0))) {
+ try (Connection c = connect(grid)) {
execute(c, "create table person (id int primary key, firstName varchar, lastName varchar) " +
"with \"atomicity=transactional_snapshot,cache_name=Person\"");
@@ -90,6 +89,8 @@ public abstract class CacheMvccSelectForUpdateQueryAbstractTest extends CacheMvc
tx.commit();
}
}
+
+ startGridsMultiThreaded(1, 2);
}
/**