You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/04/20 08:26:21 UTC

[ignite] branch master updated: IGNITE-13444 Durable tasks are cancelled on grid deactivation, starting new tasks is prohibited - Fixes #8244.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7498164  IGNITE-13444 Durable tasks are cancelled on grid deactivation, starting new tasks is prohibited - Fixes #8244.
7498164 is described below

commit 749816481149b9a54e9b780ea2289aafd1eeecef
Author: makedonskaya <m....@gmail.com>
AuthorDate: Tue Apr 20 11:25:28 2021 +0300

    IGNITE-13444 Durable tasks are cancelled on grid deactivation, starting new tasks is prohibited - Fixes #8244.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../cluster/GridClusterStateProcessor.java         |  2 +
 .../localtask/DurableBackgroundTasksProcessor.java | 59 ++++++++++++++----
 .../db/LongDestroyDurableBackgroundTaskTest.java   | 70 +++++++++++++++++++---
 3 files changed, 112 insertions(+), 19 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index a774d520..73afdf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -729,6 +729,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
                     nodeIds
                 );
 
+                ctx.durableBackgroundTasksProcessor().onStateChange(msg);
+
                 if (msg.forceChangeBaselineTopology())
                     newState.setTransitionResult(msg.requestId(), msg.state());
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
index c703df4..dfbab72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadO
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.thread.IgniteThread;
@@ -64,6 +66,16 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
     /** Durable background tasks map. */
     private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap<>();
 
+    /** Set of started tasks' names. */
+    private final Set<String> startedTasks = new GridConcurrentHashSet<>();
+
+    /**
+     * Ban to start new tasks. The first time the cluster is activated, it will try again to run existing tasks.
+     *
+     *  @see #onStateChangeFinish(ChangeGlobalStateFinishMessage)
+     */
+    private volatile boolean forbidStartingNewTasks;
+
     /**
      * @param ctx Kernal context.
      */
@@ -78,22 +90,31 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
         assert durableBackgroundTasks != null;
 
         for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
-            if (!task.isCompleted())
-                asyncDurableBackgroundTaskExecute(task, false);
+            if (!task.isCompleted() && startedTasks.add(task.shortName()))
+                asyncDurableBackgroundTaskExecute(task);
         }
     }
 
     /**
      * Creates a worker to execute single durable background task.
+     *
      * @param task Task.
-     * @param dropTaskIfFailed Whether to delete task from metastorage, if it has failed.
      */
-    private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task, boolean dropTaskIfFailed) {
+    private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task) {
         String workerName = "async-durable-background-task-executor-" + asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
 
         GridWorker worker = new GridWorker(ctx.igniteInstanceName(), workerName, log) {
+            @Override public void cancel() {
+                task.onCancel();
+
+                super.cancel();
+            }
+
             @Override protected void body() {
                 try {
+                    if (forbidStartingNewTasks)
+                        return;
+
                     log.info("Executing durable background task: " + task.shortName());
 
                     task.execute(ctx);
@@ -104,11 +125,10 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
                 }
                 catch (Throwable e) {
                     log.error("Could not execute durable background task: " + task.shortName(), e);
-
-                    if (dropTaskIfFailed)
-                        removeDurableBackgroundTask(task);
                 }
                 finally {
+                    startedTasks.remove(task.shortName());
+
                     asyncDurableBackgroundTaskWorkers.remove(this);
                 }
             }
@@ -128,8 +148,9 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
 
     /** {@inheritDoc} */
     @Override public void onKernalStop(boolean cancel) {
-        // Waiting for workers, but not cancelling them, trying to complete running tasks.
-        awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, false, log);
+        forbidStartingNewTasks = true;
+
+        awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
     }
 
     /** {@inheritDoc} */
@@ -140,9 +161,23 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
     /**
      * @param msg Message.
      */
-    public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
-        if (!msg.clusterActive())
+    public void onStateChange(ChangeGlobalStateMessage msg) {
+        if (msg.state() == ClusterState.INACTIVE) {
+            forbidStartingNewTasks = true;
+
             awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
+        }
+    }
+
+    /**
+     * @param msg Message.
+     */
+    public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
+        if (msg.state() != ClusterState.INACTIVE) {
+            forbidStartingNewTasks = false;
+
+            asyncDurableBackgroundTasksExecution();
+        }
     }
 
     /** {@inheritDoc} */
@@ -267,7 +302,7 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem
         if (CU.isPersistentCache(ccfg, ctx.config().getDataStorageConfiguration()))
             addDurableBackgroundTask(task);
 
-        asyncDurableBackgroundTaskExecute(task, false);
+        asyncDurableBackgroundTaskExecute(task);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
index 5c4cb04..5fdfa62 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
@@ -47,6 +47,7 @@ import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings;
 import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
@@ -163,19 +164,30 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
     /** */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         return super.getConfiguration(igniteInstanceName)
+            .setFailureHandler(new StopNodeFailureHandler())
             .setDataStorageConfiguration(
-                new DataStorageConfiguration().setDefaultDataRegionConfiguration(
-                    new DataRegionConfiguration()
-                        .setPersistenceEnabled(true)
-                        .setInitialSize(10 * 1024L * 1024L)
-                        .setMaxSize(50 * 1024L * 1024L)
-                )
-                .setCheckpointFrequency(Long.MAX_VALUE / 2)
+                new DataStorageConfiguration()
+                    .setDefaultDataRegionConfiguration(
+                        new DataRegionConfiguration()
+                            .setPersistenceEnabled(true)
+                            .setInitialSize(10 * 1024L * 1024L)
+                            .setMaxSize(50 * 1024L * 1024L)
+                    )
+                    .setDataRegionConfigurations(
+                        new DataRegionConfiguration()
+                            .setName("dr1")
+                            .setPersistenceEnabled(false)
+                    )
+                    .setCheckpointFrequency(Long.MAX_VALUE / 2)
             )
             .setCacheConfiguration(
                 new CacheConfiguration(DEFAULT_CACHE_NAME)
                     .setBackups(1)
+                    .setSqlSchema("PUBLIC"),
+                new CacheConfiguration<Integer, Integer>("TEST")
                     .setSqlSchema("PUBLIC")
+                    .setBackups(1)
+                    .setDataRegionName("dr1")
             )
             .setGridLogger(testLog);
     }
@@ -542,6 +554,50 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest
     }
 
     /**
+     * Test case when cluster deactivation happens with no-persistence cache. Index tree deletion task should not be
+     * started after stopping cache.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testClusterDeactivationShouldPassWithoutErrors() throws Exception {
+        IgniteEx ignite = startGrids(NODES_COUNT);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Integer, Integer> cache = ignite.cache("TEST");
+
+        query(cache, "create table TEST (id integer primary key, p integer, f integer) with " +
+            "\"DATA_REGION=dr1\"");
+
+        query(cache, "create index TEST_IDX on TEST (p)");
+
+        for (int i = 0; i < 5_000; i++)
+            query(cache, "insert into TEST (id, p, f) values (?, ?, ?)", i, i, i);
+
+        LogListener lsnr = LogListener.matches("Could not execute durable background task").build();
+        LogListener lsnr2 = LogListener.matches("Executing durable background task").build();
+        LogListener lsnr3 = LogListener.matches("Execution of durable background task completed").build();
+
+        testLog.registerAllListeners(lsnr, lsnr2, lsnr3);
+
+        ignite.cluster().active(false);
+
+        doSleep(1_000);
+
+        assertFalse(lsnr.check());
+        assertFalse(lsnr2.check());
+        assertFalse(lsnr3.check());
+
+        testLog.unregisterListener(lsnr);
+        testLog.unregisterListener(lsnr2);
+        testLog.unregisterListener(lsnr3);
+
+        for (int i = 0; i < NODES_COUNT; i++)
+            grid(i);
+    }
+
+    /**
      * Tests case when long index deletion operation happens. Checkpoint should run in the middle of index deletion
      * operation.
      *