You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/24 06:09:28 UTC

[02/11] ignite git commit: IGNITE-9911: MVCC: Fixed a hang during vacuum. This closes #5035.

IGNITE-9911: MVCC: Fixed a hang during vacuum. This closes #5035.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1584a8f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1584a8f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1584a8f

Branch: refs/heads/ignite-9720
Commit: b1584a8f2636d390b3f8cc754ce1195ccc797807
Parents: 962d6a2
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Tue Oct 23 12:05:23 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Oct 23 12:05:23 2018 +0300

----------------------------------------------------------------------
 .../cache/mvcc/MvccProcessorImpl.java           | 228 ++++++++-----------
 .../cache/mvcc/CacheMvccAbstractTest.java       |  21 +-
 ...cheMvccSelectForUpdateQueryAbstractTest.java |  11 +-
 3 files changed, 115 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b1584a8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index 9fcafb0..e58151f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -90,8 +90,8 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.GridCursor;
-import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -115,7 +115,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker.MVCC_TRACKER_ID_NA;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA;
@@ -181,9 +180,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
      */
     private final Object mux = new Object();
 
-    /** For tests only. */
-    private volatile Throwable vacuumError;
-
     /** */
     private final GridAtomicLong futIdCntr = new GridAtomicLong(0);
 
@@ -1136,8 +1132,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
             crdVer == 0 && ctx.localNodeId().equals(crd0.nodeId()))
             return new GridFinishedFuture<>(new VacuumMetrics());
 
-        final GridCompoundIdentityFuture<VacuumMetrics> res =
-            new GridCompoundIdentityFuture<>(new VacuumMetricsReducer());
+        final GridFutureAdapter<VacuumMetrics> res = new GridFutureAdapter<>();
 
         MvccSnapshot snapshot;
 
@@ -1174,28 +1169,10 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
     }
 
     /**
-     * For tests only.
-     *
-     * @return Vacuum error.
-     */
-    Throwable vacuumError() {
-        return vacuumError;
-    }
-
-    /**
-     * For tests only.
-     *
-     * @param e Vacuum error.
-     */
-    void vacuumError(Throwable e) {
-        this.vacuumError = e;
-    }
-
-    /**
      * @param res Result.
      * @param snapshot Snapshot.
      */
-    private void continueRunVacuum(GridCompoundIdentityFuture<VacuumMetrics> res, MvccSnapshot snapshot) {
+    private void continueRunVacuum(GridFutureAdapter<VacuumMetrics> res, MvccSnapshot snapshot) {
         ackTxCommit(snapshot)
             .listen(new IgniteInClosure<IgniteInternalFuture>() {
                 @Override public void apply(IgniteInternalFuture fut) {
@@ -1220,23 +1197,45 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
                                     return;
                                 }
 
+                                GridCompoundIdentityFuture<VacuumMetrics> res0 =
+                                    new GridCompoundIdentityFuture<VacuumMetrics>(new VacuumMetricsReducer()) {
+                                        /** {@inheritDoc} */
+                                        @Override protected void logError(IgniteLogger log, String msg, Throwable e) {
+                                            // no-op
+                                        }
+
+                                        /** {@inheritDoc} */
+                                        @Override protected void logDebug(IgniteLogger log, String msg) {
+                                            // no-op
+                                        }
+                                    };
+
                                 for (CacheGroupContext grp : ctx.cache().cacheGroups()) {
                                     if (grp.mvccEnabled()) {
-                                        for (GridDhtLocalPartition part : grp.topology().localPartitions()) {
-                                            VacuumTask task = new VacuumTask(snapshot, part);
+                                        grp.topology().readLock();
+
+                                        try {
+                                            for (GridDhtLocalPartition part : grp.topology().localPartitions()) {
+                                                VacuumTask task = new VacuumTask(snapshot, part);
 
-                                            cleanupQueue.offer(task);
+                                                cleanupQueue.offer(task);
 
-                                            res.add(task);
+                                                res0.add(task);
+                                            }
+                                        }
+                                        finally {
+                                            grp.topology().readUnlock();
                                         }
                                     }
                                 }
-                            }
 
-                            res.listen(new CI1<IgniteInternalFuture<VacuumMetrics>>() {
-                                @Override public void apply(IgniteInternalFuture<VacuumMetrics> fut) {
+                                res0.markInitialized();
+
+                                res0.listen(future -> {
+                                    VacuumMetrics metrics = null; Throwable ex = null;
+
                                     try {
-                                        VacuumMetrics metrics = fut.get();
+                                        metrics = future.get();
 
                                         if (U.assertionsEnabled()) {
                                             MvccCoordinator crd = currentCoordinator();
@@ -1260,14 +1259,17 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
                                     catch (NodeStoppingException ignored) {
                                         if (log.isDebugEnabled())
                                             log.debug("Cannot complete vacuum (node is stopping).");
+
+                                        metrics = new VacuumMetrics();
                                     }
                                     catch (Throwable e) {
-                                        U.error(log, "Vacuum error.", e);
+                                        ex = new GridClosureException(e);
                                     }
-                                }
-                            });
 
-                            res.markInitialized();
+                                    res.onDone(metrics, ex);
+                                });
+                            }
+
                         }
                         catch (Throwable e) {
                             completeWithException(res, e);
@@ -2138,13 +2140,13 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
                     }
                 }
                 catch (IgniteInterruptedCheckedException e) {
-                    throw e;
+                    throw e; // Cancelled.
                 }
                 catch (Throwable e) {
-                    prc.vacuumError(e);
-
                     if (e instanceof Error)
                         throw (Error) e;
+
+                    U.error(log, "Vacuum error.", e);
                 }
 
                 long delay = nextScheduledTime - U.currentTimeMillis();
@@ -2179,20 +2181,29 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
                 VacuumTask task = cleanupQueue.take();
 
                 try {
-                    if (task.part().state() != OWNING) {
-                        task.part().group().preloader().rebalanceFuture()
-                            .listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
-                                @Override public void apply(IgniteInternalFuture<Boolean> future) {
-                                    cleanupQueue.add(task);
-                                }
-                            });
+                    switch (task.part().state()) {
+                        case EVICTED:
+                        case RENTING:
+                            task.onDone(new VacuumMetrics());
 
-                        continue;
-                    }
+                            break;
+                        case MOVING:
+                            task.part().group().preloader().rebalanceFuture().listen(f -> cleanupQueue.add(task));
 
-                    task.onDone(processPartition(task));
+                            break;
+                        case OWNING:
+                            task.onDone(processPartition(task));
+
+                            break;
+                        case LOST:
+                            task.onDone(new IgniteCheckedException("Partition is lost."));
+
+                            break;
+                    }
                 }
                 catch (IgniteInterruptedCheckedException e) {
+                    task.onDone(e);
+
                     throw e; // Cancelled.
                 }
                 catch (Throwable e) {
@@ -2217,7 +2228,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
 
             VacuumMetrics metrics = new VacuumMetrics();
 
-            if (!canRunVacuum(part, null) || !part.reserve())
+            if (!part.reserve())
                 return metrics;
 
             int curCacheId = CU.UNDEFINED_CACHE_ID;
@@ -2289,63 +2300,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
 
                 return metrics;
             }
-            catch (Exception e) {
-                if (canRunVacuum(part, curCacheId))
-                    throw e; // Unexpected error.
-
-                U.warn(log, "Error occurred during the vacuum. Skip vacuuming for the current partition. " +
-                    "[cacheId=" + curCacheId + ", part=" + part + ", err=" + e.getMessage() + ']', e);
-
-                return new VacuumMetrics();
-            }
             finally {
                 part.release();
             }
         }
 
-        /**
-         * @param part Partition.
-         * @param cacheId Cache id.
-         * @return {@code True} if we can vacuum given partition.
-         */
-        private boolean canRunVacuum(GridDhtLocalPartition part, Integer cacheId) {
-            if (part == null || part.state() != OWNING)
-                return false;
-
-            CacheGroupContext grp = part.group();
-
-            assert grp != null;
-
-            List<GridCacheContext> caches = grp.caches();
-
-            if (F.isEmpty(caches))
-                return false;
-
-            if (grp.shared().kernalContext().isStopping())
-                return false;
-
-            if (cacheId == null && grp.sharedGroup())
-                return true; // Cache context is unknown, but we can try to run vacuum.
-
-            GridCacheContext ctx0;
-
-            if (grp.sharedGroup()) {
-                assert cacheId != null && cacheId != CU.UNDEFINED_CACHE_ID;
-
-                if (!grp.cacheIds().contains(cacheId))
-                    return false;
-
-                ctx0 = grp.shared().cacheContext(cacheId);
-            }
-            else
-                ctx0 = caches.get(0);
-
-            if (ctx0 == null)
-                return false;
-
-            return !grp.shared().closed(ctx0);
-        }
-
         /** */
         @SuppressWarnings("unchecked")
         @NotNull private Object addRest(@Nullable Object rest, MvccDataRow row) {
@@ -2417,47 +2376,54 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
             Object rest, GridCacheContext cctx, VacuumMetrics metrics) throws IgniteCheckedException {
             assert key != null && cctx != null && (!F.isEmpty(cleanupRows) || rest != null);
 
-            long cleanupStartNanoTime = System.nanoTime();
+            cctx.gate().enter();
 
-            GridCacheEntryEx entry = cctx.cache().entryEx(key);
+            try {
+                long cleanupStartNanoTime = System.nanoTime();
 
-            while (true) {
-                entry.lockEntry();
+                GridCacheEntryEx entry = cctx.cache().entryEx(key);
 
-                if (!entry.obsolete())
-                    break;
+                while (true) {
+                    entry.lockEntry();
 
-                entry.unlockEntry();
+                    if (!entry.obsolete())
+                        break;
 
-                entry = cctx.cache().entryEx(key);
-            }
+                    entry.unlockEntry();
 
-            cctx.shared().database().checkpointReadLock();
+                    entry = cctx.cache().entryEx(key);
+                }
 
-            int cleaned = 0;
+                cctx.shared().database().checkpointReadLock();
 
-            try {
-                if (cleanupRows != null)
-                    cleaned = part.dataStore().cleanup(cctx, cleanupRows);
+                int cleaned = 0;
+
+                try {
+                    if (cleanupRows != null)
+                        cleaned = part.dataStore().cleanup(cctx, cleanupRows);
 
-                if (rest != null) {
-                    if (rest.getClass() == ArrayList.class) {
-                        for (MvccDataRow row : ((List<MvccDataRow>)rest)) {
-                            part.dataStore().updateTxState(cctx, row);
+                    if (rest != null) {
+                        if (rest.getClass() == ArrayList.class) {
+                            for (MvccDataRow row : ((List<MvccDataRow>)rest)) {
+                                part.dataStore().updateTxState(cctx, row);
+                            }
                         }
+                        else
+                            part.dataStore().updateTxState(cctx, (MvccDataRow)rest);
                     }
-                    else
-                        part.dataStore().updateTxState(cctx, (MvccDataRow)rest);
                 }
-            }
-            finally {
-                cctx.shared().database().checkpointReadUnlock();
+                finally {
+                    cctx.shared().database().checkpointReadUnlock();
 
-                entry.unlockEntry();
-                cctx.evicts().touch(entry, AffinityTopologyVersion.NONE);
+                    entry.unlockEntry();
+                    cctx.evicts().touch(entry, AffinityTopologyVersion.NONE);
 
-                metrics.addCleanupNanoTime(System.nanoTime() - cleanupStartNanoTime);
-                metrics.addCleanupRowsCnt(cleaned);
+                    metrics.addCleanupNanoTime(System.nanoTime() - cleanupStartNanoTime);
+                    metrics.addCleanupRowsCnt(cleaned);
+                }
+            }
+            finally {
+                cctx.gate().leave();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1584a8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index ec6b78a..57f714a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -1532,14 +1532,21 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     protected void verifyOldVersionsCleaned() throws Exception {
-        runVacuumSync();
+        boolean retry;
 
-        awaitPartitionMapExchange();
+        try {
+            runVacuumSync();
 
-        // Check versions.
-        boolean cleaned = checkOldVersions(false);
+            // Check versions.
+            retry = !checkOldVersions(false);
+        }
+        catch (Exception e) {
+            U.warn(log(), "Failed to perform vacuum, will retry.", e);
 
-        if (!cleaned) { // Retry on a stable topology with a newer snapshot.
+            retry = true;
+        }
+
+        if (retry) { // Retry on a stable topology with a newer snapshot.
             awaitPartitionMapExchange();
 
             runVacuumSync();
@@ -1605,10 +1612,6 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
 
                 assert GridTestUtils.getFieldValue(crd, "txLog") != null;
 
-                Throwable vacuumError = crd.vacuumError();
-
-                assertNull(X.getFullStackTrace(vacuumError), vacuumError);
-
                 fut.add(crd.runVacuum());
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1584a8f/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
index c6584e9..16b45ab 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.util.typedef.X;
@@ -57,7 +58,7 @@ public abstract class CacheMvccSelectForUpdateQueryAbstractTest extends CacheMvc
 
         disableScheduledVacuum = getName().equals("testSelectForUpdateAfterAbortedTx");
 
-        startGrids(3);
+        IgniteEx grid = startGrid(0);
 
         CacheConfiguration seg = new CacheConfiguration("segmented*");
 
@@ -66,11 +67,9 @@ public abstract class CacheMvccSelectForUpdateQueryAbstractTest extends CacheMvc
         if (seg.getCacheMode() == PARTITIONED)
             seg.setQueryParallelism(4);
 
-        grid(0).addCacheConfiguration(seg);
+        grid.addCacheConfiguration(seg);
 
-        Thread.sleep(1000L);
-
-        try (Connection c = connect(grid(0))) {
+        try (Connection c = connect(grid)) {
             execute(c, "create table person (id int primary key, firstName varchar, lastName varchar) " +
                 "with \"atomicity=transactional_snapshot,cache_name=Person\"");
 
@@ -90,6 +89,8 @@ public abstract class CacheMvccSelectForUpdateQueryAbstractTest extends CacheMvc
                 tx.commit();
             }
         }
+
+        startGridsMultiThreaded(1, 2);
     }
 
     /**