You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2020/07/23 14:29:58 UTC

[ignite] branch master updated: IGNITE-13269 Waiting for operation completion is added in case of stopping case - Fixes #8056.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a17f4a  IGNITE-13269 Waiting for operation completion is added in case of stopping case - Fixes #8056.
9a17f4a is described below

commit 9a17f4a2e29e7f848e9c1f6fd480ad6ee345c6fe
Author: ktkalenko <kt...@gridgain.com>
AuthorDate: Thu Jul 23 17:01:45 2020 +0300

    IGNITE-13269 Waiting for operation completion is added in case of stopping case - Fixes #8056.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../schema/SchemaIndexCachePartitionWorker.java      |  4 ++--
 .../query/schema/SchemaIndexCacheVisitorImpl.java    |  2 +-
 .../query/schema/SchemaOperationWorker.java          | 17 ++++++++++++++++-
 .../cache/index/AbstractSchemaSelfTest.java          | 15 ++++++++++++++-
 .../DynamicColumnsAbstractConcurrentSelfTest.java    | 20 ++++++++++----------
 .../DynamicIndexAbstractConcurrentSelfTest.java      | 20 +++++++++++---------
 6 files changed, 54 insertions(+), 24 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
index b1162da..860f742 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
@@ -62,7 +62,7 @@ public class SchemaIndexCachePartitionWorker extends GridWorker {
     private final AtomicBoolean stop;
 
     /** Cancellation token between all workers for all caches. */
-    private final SchemaIndexOperationCancellationToken cancel;
+    @Nullable private final SchemaIndexOperationCancellationToken cancel;
 
     /** Index closure. */
     private final SchemaIndexCacheVisitorClosureWrapper wrappedClo;
@@ -91,7 +91,7 @@ public class SchemaIndexCachePartitionWorker extends GridWorker {
         GridCacheContext cctx,
         GridDhtLocalPartition locPart,
         AtomicBoolean stop,
-        SchemaIndexOperationCancellationToken cancel,
+        @Nullable SchemaIndexOperationCancellationToken cancel,
         SchemaIndexCacheVisitorClosure clo,
         GridFutureAdapter<SchemaIndexCacheStat> fut,
         AtomicInteger partsCnt
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
index 3b1fbb5..25af441 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
@@ -55,7 +55,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
     private final GridCacheContext cctx;
 
     /** Cancellation token. */
-    private final SchemaIndexOperationCancellationToken cancel;
+    @Nullable private final SchemaIndexOperationCancellationToken cancel;
 
     /** Future for create/rebuild index. */
     protected final GridFutureAdapter<Void> buildIdxFut;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
index dfcf391..5f58c6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.schema;
 
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -28,6 +29,7 @@ import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstra
 import org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.internal.worker.WorkersRegistry;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
@@ -67,6 +69,9 @@ public class SchemaOperationWorker extends GridWorker {
     /** Cancellation token. */
     private final SchemaIndexOperationCancellationToken cancelToken = new SchemaIndexOperationCancellationToken();
 
+    /** Workers registry. */
+    private final WorkersRegistry workersRegistry;
+
     /**
      * Constructor.
      *
@@ -90,6 +95,7 @@ public class SchemaOperationWorker extends GridWorker {
         this.nop = nop;
         this.cacheRegistered = cacheRegistered;
         this.type = type;
+        this.workersRegistry = ctx.workersRegistry();
 
         fut = new GridFutureAdapter();
 
@@ -178,8 +184,17 @@ public class SchemaOperationWorker extends GridWorker {
      * Cancel operation.
      */
     @Override public void cancel() {
-        if (cancelToken.cancel())
+        if (cancelToken.cancel()) {
+            try {
+                fut.get(workersRegistry.getSystemWorkerBlockedTimeout());
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.error("Error completing operation", e);
+            }
+
             super.cancel();
+        }
     }
 
     /**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
index 523b130..12411aa 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
@@ -43,6 +43,7 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
@@ -554,10 +555,22 @@ public abstract class AbstractSchemaSelfTest extends AbstractIndexingCommonTest
 
     /**
      * Destroy SQL cache on given node.
+     *
      * @param node Node to create cache on.
+     * @throws IgniteCheckedException if failed.
      */
     protected void destroySqlCache(Ignite node) throws IgniteCheckedException {
-        ((IgniteEx)node).context().cache().dynamicDestroyCache(CACHE_NAME, true, true, false, null).get();
+        destroySqlCacheFuture(node).get();
+    }
+
+    /**
+     * Starting destroy SQL cache with return of future.
+     *
+     * @param node Node to create cache on.
+     * @return Future that will be completed when cache is destroyed.
+     */
+    protected IgniteInternalFuture<Boolean> destroySqlCacheFuture(Ignite node) {
+        return ((IgniteEx)node).context().cache().dynamicDestroyCache(CACHE_NAME, true, true, false, null);
     }
 
     /**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
index d3bef3e..59876b5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.GridTestUtils.RunnableX;
 import org.junit.Test;
 
@@ -618,20 +619,19 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
 
         idxLatch.await();
 
-        // Destroy cache (drop table).
-        run(cli, DROP_SQL);
+        // Start destroy cache (drop table).
+        IgniteInternalFuture<List<List<?>>> dropFut = GridTestUtils.runAsync(() -> run(cli, DROP_SQL));
+
+        U.sleep(2_000);
+
+        assertFalse(idxFut.isDone());
+        assertFalse(dropFut.isDone());
 
         // Unblock indexing and see what happens.
         unblockIndexing(srv1);
 
-        try {
-            idxFut.get();
-
-            fail("Exception has not been thrown.");
-        }
-        catch (SchemaOperationException e) {
-            // No-op.
-        }
+        idxFut.get();
+        dropFut.get();
     }
 
     /**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
index 5001296..f6efc85 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
@@ -49,7 +49,9 @@ import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
 import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
 import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Test;
 
@@ -466,20 +468,20 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde
 
         idxLatch.await();
 
-        // Destroy cache (drop table).
-        destroySqlCache(cli);
+        // Start destroy cache (drop table).
+        IgniteInternalFuture<Boolean> desFut = destroySqlCacheFuture(cli);
+
+        U.sleep(2_000);
+
+        assertFalse(idxFut.isDone());
+        assertFalse(desFut.isDone());
 
         // Unblock indexing and see what happens.
         unblockIndexing(srv1);
 
-        try {
-            idxFut.get();
+        GridTestUtils.assertThrows(log, (Callable<?>)idxFut::get, SchemaOperationException.class, null);
 
-            fail("Exception has not been thrown.");
-        }
-        catch (SchemaOperationException e) {
-            // No-op.
-        }
+        assertTrue(desFut.get());
     }
 
     /**