You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2019/12/25 06:29:49 UTC

[ignite] branch master updated: IGNITE-12272 Delayed TX recovery (#7172)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new de4e7aa  IGNITE-12272 Delayed TX recovery (#7172)
de4e7aa is described below

commit de4e7aa834b2fd7f3d18d4425aa29ca1971b02ed
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Wed Dec 25 09:29:29 2019 +0300

    IGNITE-12272 Delayed TX recovery (#7172)
    
    Signed-off-by: Anton Vinogradov <av...@apache.org>
---
 .../org/apache/ignite/IgniteSystemProperties.java  |   6 -
 .../cache/transactions/IgniteTxManager.java        |  42 ++--
 .../GridCacheNodeFailureAbstractTest.java          |   4 -
 .../GridCachePartitionedTxSalvageSelfTest.java     | 258 ---------------------
 .../testsuites/IgniteCacheFailoverTestSuite2.java  |   2 -
 5 files changed, 14 insertions(+), 298 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 600495e..19dfeb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -252,12 +252,6 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_SLOW_TX_WARN_TIMEOUT = "IGNITE_SLOW_TX_WARN_TIMEOUT";
 
     /**
-     * Timeout after which all uncompleted transactions originated by left node will be
-     * salvaged (i.e. invalidated and committed).
-     */
-    public static final String IGNITE_TX_SALVAGE_TIMEOUT = "IGNITE_TX_SALVAGE_TIMEOUT";
-
-    /**
      * Specifies maximum number of iterations for deadlock detection procedure.
      * If value of this property is less then or equal to zero then deadlock detection will be disabled.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 2166da4..a6fd06c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -38,6 +38,8 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteFeatures;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -80,7 +82,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetect
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cluster.BaselineTopology;
 import org.apache.ignite.internal.processors.metric.impl.HitRateMetric;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
@@ -96,7 +97,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteReducer;
-import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.systemview.view.TransactionView;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -113,7 +113,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_TRANSACTION_TIME_D
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_PER_SECOND_LIMIT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_OWNER_DUMP_REQUESTS_ALLOWED;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_SALVAGE_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -157,9 +156,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /** Slow tx warn timeout (initialized to 0). */
     private static final int SLOW_TX_WARN_TIMEOUT = Integer.getInteger(IGNITE_SLOW_TX_WARN_TIMEOUT, 0);
 
-    /** Tx salvage timeout. */
-    private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100);
-
     /** One phase commit deferred ack request timeout. */
     public static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT =
         Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT, 500);
@@ -323,9 +319,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                     if (evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) {
                         UUID nodeId = evt.eventNode().id();
 
-                        // Wait some time in case there are some unprocessed messages from failed node.
-                        cctx.time().addTimeoutObject(
-                            new NodeFailureTimeoutObject(evt.eventNode(), cctx.coordinators().currentCoordinator()));
+                        IgniteInternalFuture<?> recInitFut = cctx.kernalContext().closure().runLocalSafe(
+                            new TxRecoveryInitRunnable(evt.eventNode(), cctx.coordinators().currentCoordinator()));
+
+                        recInitFut.listen(future -> {
+                            if (future.error() != null)
+                                cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, future.error()));
+                        });
 
                         for (TxDeadlockFuture fut : deadlockDetectFuts.values())
                             fut.onNodeLeft(nodeId);
@@ -2737,9 +2737,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * Timeout object for node failure handler.
+     * Transactions recovery initialization runnable.
      */
-    private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {
+    private final class TxRecoveryInitRunnable implements Runnable {
         /** */
         private final ClusterNode node;
 
@@ -2750,17 +2750,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
          * @param node Failed node.
          * @param mvccCrd Mvcc coordinator at time of node failure.
          */
-        private NodeFailureTimeoutObject(ClusterNode node, MvccCoordinator mvccCrd) {
-            super(IgniteUuid.fromUuid(cctx.localNodeId()), TX_SALVAGE_TIMEOUT);
-
+        private TxRecoveryInitRunnable(ClusterNode node, MvccCoordinator mvccCrd) {
             this.node = node;
             this.mvccCrd = mvccCrd;
         }
 
-        /**
-         *
-         */
-        private void onTimeout0() {
+        /** {@inheritDoc} */
+        @Override public void run() {
             try {
                 cctx.kernalContext().gateway().readLock();
             }
@@ -2853,16 +2849,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 cctx.kernalContext().gateway().readUnlock();
             }
         }
-
-        /** {@inheritDoc} */
-        @Override public void onTimeout() {
-            // Should not block timeout thread.
-            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                @Override public void run() {
-                    onTimeout0();
-                }
-            });
-        }
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java
index 2f5c822..1c1d1fd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java
@@ -42,8 +42,6 @@ import org.apache.ignite.transactions.TransactionIsolation;
 import org.junit.Test;
 
 import static org.apache.ignite.IgniteState.STOPPED;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_SALVAGE_TIMEOUT;
-import static org.apache.ignite.IgniteSystemProperties.getInteger;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
@@ -187,8 +185,6 @@ public abstract class GridCacheNodeFailureAbstractTest extends GridCommonAbstrac
 
             f.get();
 
-            U.sleep(getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 3000));
-
             IgniteCache<Integer, String> checkCache = jcache(checkIdx);
 
             boolean locked = false;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java
deleted file mode 100644
index 4542449..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.ignite.transactions.Transaction;
-import org.apache.ignite.transactions.TransactionConcurrency;
-import org.junit.Test;
-
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_SALVAGE_TIMEOUT;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
-import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
-/**
- * Test tx salvage.
- */
-public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTest {
-    /** Grid count. */
-    private static final int GRID_CNT = 5;
-
-    /** Key count. */
-    private static final int KEY_CNT = 10;
-
-    /** Salvage timeout system property value. */
-    private static final Integer SALVAGE_TIMEOUT = 5000;
-
-    /** Difference between salvage timeout and actual wait time when performing "before salvage" tests. */
-    private static final int DELTA_BEFORE = 1000;
-
-    /** How much time to wait after salvage timeout when performing "after salvage" tests. */
-    private static final int DELTA_AFTER = 1000;
-
-    /** Salvage timeout system property value before alteration. */
-    private static String salvageTimeoutOld;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
-        IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
-
-        CacheConfiguration cc = defaultCacheConfiguration();
-
-        cc.setCacheMode(PARTITIONED);
-        cc.setAffinity(new RendezvousAffinityFunction(false, 18));
-        cc.setBackups(1);
-        cc.setRebalanceMode(SYNC);
-
-        c.setCacheConfiguration(cc);
-
-        return c;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        // Set salvage timeout system property.
-        salvageTimeoutOld = System.setProperty(IGNITE_TX_SALVAGE_TIMEOUT, SALVAGE_TIMEOUT.toString());
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        // Restore salvage timeout system property to its initial state.
-        if (salvageTimeoutOld != null)
-            System.setProperty(IGNITE_TX_SALVAGE_TIMEOUT, salvageTimeoutOld);
-        else
-            System.clearProperty(IGNITE_TX_SALVAGE_TIMEOUT);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        // Start the grid.
-        startGridsMultiThreaded(GRID_CNT);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-
-        System.gc();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testOptimisticTxSalvageBeforeTimeout() throws Exception {
-        checkSalvageBeforeTimeout(OPTIMISTIC, true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testPessimisticcTxSalvageBeforeTimeout() throws Exception {
-        checkSalvageBeforeTimeout(PESSIMISTIC, false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testOptimisticTxSalvageAfterTimeout() throws Exception {
-        checkSalvageAfterTimeout(OPTIMISTIC, true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testPessimisticTxSalvageAfterTimeout() throws Exception {
-        checkSalvageAfterTimeout(PESSIMISTIC, false);
-    }
-
-    /**
-     * Check whether caches has no transactions after salvage timeout.
-     *
-     * @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC).
-     * @param prepare Whether to prepare transaction state (i.e. call {@link GridNearTxLocal#prepare(boolean)}).
-     * @throws Exception If failed.
-     */
-    private void checkSalvageAfterTimeout(TransactionConcurrency mode, boolean prepare) throws Exception {
-        startTxAndPutKeys(mode, prepare);
-
-        stopNodeAndSleep(SALVAGE_TIMEOUT + DELTA_AFTER);
-
-        for (int i = 1; i < GRID_CNT; i++) {
-            checkTxsEmpty(near(i).context());
-            checkTxsEmpty(dht(i).context());
-        }
-    }
-
-    /**
-     * Check whether caches still has all transactions before salvage timeout.
-     *
-     * @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC).
-     * @param prepare Whether to prepare transaction state
-     *                (i.e. call {@link GridNearTxLocal#prepare(boolean)}).
-     * @throws Exception If failed.
-     */
-    private void checkSalvageBeforeTimeout(TransactionConcurrency mode, boolean prepare) throws Exception {
-        startTxAndPutKeys(mode, prepare);
-
-        List<Integer> nearSizes = new ArrayList<>(GRID_CNT - 1);
-        List<Integer> dhtSizes = new ArrayList<>(GRID_CNT - 1);
-
-        for (int i = 1; i < GRID_CNT; i++) {
-            nearSizes.add(near(i).context().tm().activeTransactions().size());
-            dhtSizes.add(dht(i).context().tm().activeTransactions().size());
-        }
-
-        stopNodeAndSleep(SALVAGE_TIMEOUT - DELTA_BEFORE);
-
-        for (int i = 1; i < GRID_CNT; i++) {
-            checkTxsNotEmpty(near(i).context(), nearSizes.get(i - 1));
-            checkTxsNotEmpty(dht(i).context(), dhtSizes.get(i - 1));
-        }
-    }
-
-    /**
-     * Start new transaction on the grid(0) and put some keys to it.
-     *
-     * @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC).
-     * @param prepare Whether to prepare transaction state (i.e. call {@link GridNearTxLocal#prepare(boolean)}).
-     * @throws Exception If failed.
-     */
-    private void startTxAndPutKeys(final TransactionConcurrency mode, final boolean prepare) throws Exception {
-        Ignite ignite = grid(0);
-
-        final Collection<Integer> keys = nearKeys(ignite.cache(DEFAULT_CACHE_NAME), KEY_CNT, 0);
-
-        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
-            @Override public void run() {
-                IgniteCache<Object, Object> c = jcache(0);
-
-                try {
-                    Transaction tx = grid(0).transactions().txStart(mode, REPEATABLE_READ);
-
-                    for (Integer key : keys)
-                        c.put(key, "val" + key);
-
-                    if (prepare)
-                        ((TransactionProxyImpl)tx).tx().prepare(true);
-                }
-                catch (IgniteCheckedException e) {
-                    info("Failed to put keys to cache: " + e.getMessage());
-                }
-            }
-        }, 1);
-
-        fut.get();
-    }
-
-    /**
-     * Stop the very first grid node (the one with 0 index) and sleep for the given amount of time.
-     *
-     * @param timeout Sleep timeout in milliseconds.
-     * @throws Exception If failed.
-     */
-    private void stopNodeAndSleep(long timeout) throws Exception {
-        stopGrid(getTestIgniteInstanceName(0), false, false);
-
-        info("Stopped grid.");
-
-        U.sleep(timeout);
-    }
-
-    /**
-     * Checks that transaction manager for cache context does not have any pending transactions.
-     *
-     * @param ctx Cache context.
-     */
-    private void checkTxsEmpty(GridCacheContext ctx) {
-        Collection txs = ctx.tm().activeTransactions();
-
-        assert txs.isEmpty() : "Not all transactions were salvaged: " + txs;
-    }
-
-    /**
-     * Checks that transaction manager for cache context has expected number of pending transactions.
-     *
-     * @param ctx Cache context.
-     * @param exp Expected amount of transactions.
-     */
-    private void checkTxsNotEmpty(GridCacheContext ctx, int exp) {
-        int size = ctx.tm().activeTransactions().size();
-
-        assertEquals("Some transactions were salvaged unexpectedly", exp, size);
-    }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
index 87b09fe..6d965f1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
@@ -27,7 +27,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheCr
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCacheAtomicFailoverSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCacheAtomicReplicatedFailoverSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedFailoverSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedTxSalvageSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedFailoverSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteChangingBaselineDownCachePutAllFailoverTest;
 import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteChangingBaselineUpCachePutAllFailoverTest;
@@ -37,7 +36,6 @@ import org.junit.runners.Suite;
 /** */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
-    GridCachePartitionedTxSalvageSelfTest.class,
     CacheGetFromJobTest.class,
 
     GridCacheAtomicFailoverSelfTest.class,