You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2020/07/23 14:29:58 UTC
[ignite] branch master updated: IGNITE-13269 Waiting for operation
completion is added in case of stopping case - Fixes #8056.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9a17f4a IGNITE-13269 Waiting for operation completion is added in case of stopping case - Fixes #8056.
9a17f4a is described below
commit 9a17f4a2e29e7f848e9c1f6fd480ad6ee345c6fe
Author: ktkalenko <kt...@gridgain.com>
AuthorDate: Thu Jul 23 17:01:45 2020 +0300
IGNITE-13269 Waiting for operation completion is added in case of stopping case - Fixes #8056.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../schema/SchemaIndexCachePartitionWorker.java | 4 ++--
.../query/schema/SchemaIndexCacheVisitorImpl.java | 2 +-
.../query/schema/SchemaOperationWorker.java | 17 ++++++++++++++++-
.../cache/index/AbstractSchemaSelfTest.java | 15 ++++++++++++++-
.../DynamicColumnsAbstractConcurrentSelfTest.java | 20 ++++++++++----------
.../DynamicIndexAbstractConcurrentSelfTest.java | 20 +++++++++++---------
6 files changed, 54 insertions(+), 24 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
index b1162da..860f742 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
@@ -62,7 +62,7 @@ public class SchemaIndexCachePartitionWorker extends GridWorker {
private final AtomicBoolean stop;
/** Cancellation token between all workers for all caches. */
- private final SchemaIndexOperationCancellationToken cancel;
+ @Nullable private final SchemaIndexOperationCancellationToken cancel;
/** Index closure. */
private final SchemaIndexCacheVisitorClosureWrapper wrappedClo;
@@ -91,7 +91,7 @@ public class SchemaIndexCachePartitionWorker extends GridWorker {
GridCacheContext cctx,
GridDhtLocalPartition locPart,
AtomicBoolean stop,
- SchemaIndexOperationCancellationToken cancel,
+ @Nullable SchemaIndexOperationCancellationToken cancel,
SchemaIndexCacheVisitorClosure clo,
GridFutureAdapter<SchemaIndexCacheStat> fut,
AtomicInteger partsCnt
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
index 3b1fbb5..25af441 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
@@ -55,7 +55,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
private final GridCacheContext cctx;
/** Cancellation token. */
- private final SchemaIndexOperationCancellationToken cancel;
+ @Nullable private final SchemaIndexOperationCancellationToken cancel;
/** Future for create/rebuild index. */
protected final GridFutureAdapter<Void> buildIdxFut;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
index dfcf391..5f58c6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.schema;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -28,6 +29,7 @@ import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstra
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
@@ -67,6 +69,9 @@ public class SchemaOperationWorker extends GridWorker {
/** Cancellation token. */
private final SchemaIndexOperationCancellationToken cancelToken = new SchemaIndexOperationCancellationToken();
+ /** Workers registry. */
+ private final WorkersRegistry workersRegistry;
+
/**
* Constructor.
*
@@ -90,6 +95,7 @@ public class SchemaOperationWorker extends GridWorker {
this.nop = nop;
this.cacheRegistered = cacheRegistered;
this.type = type;
+ this.workersRegistry = ctx.workersRegistry();
fut = new GridFutureAdapter();
@@ -178,8 +184,17 @@ public class SchemaOperationWorker extends GridWorker {
* Cancel operation.
*/
@Override public void cancel() {
- if (cancelToken.cancel())
+ if (cancelToken.cancel()) {
+ try {
+ fut.get(workersRegistry.getSystemWorkerBlockedTimeout());
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.error("Error completing operation", e);
+ }
+
super.cancel();
+ }
}
/**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
index 523b130..12411aa 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
@@ -43,6 +43,7 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
@@ -554,10 +555,22 @@ public abstract class AbstractSchemaSelfTest extends AbstractIndexingCommonTest
/**
* Destroy SQL cache on given node.
+ *
* @param node Node to create cache on.
+ * @throws IgniteCheckedException if failed.
*/
protected void destroySqlCache(Ignite node) throws IgniteCheckedException {
- ((IgniteEx)node).context().cache().dynamicDestroyCache(CACHE_NAME, true, true, false, null).get();
+ destroySqlCacheFuture(node).get();
+ }
+
+ /**
+ * Starting destroy SQL cache with return of future.
+ *
+ * @param node Node to create cache on.
+ * @return Future that will be completed when cache is destroyed.
+ */
+ protected IgniteInternalFuture<Boolean> destroySqlCacheFuture(Ignite node) {
+ return ((IgniteEx)node).context().cache().dynamicDestroyCache(CACHE_NAME, true, true, false, null);
}
/**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
index d3bef3e..59876b5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.RunnableX;
import org.junit.Test;
@@ -618,20 +619,19 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
idxLatch.await();
- // Destroy cache (drop table).
- run(cli, DROP_SQL);
+ // Start destroy cache (drop table).
+ IgniteInternalFuture<List<List<?>>> dropFut = GridTestUtils.runAsync(() -> run(cli, DROP_SQL));
+
+ U.sleep(2_000);
+
+ assertFalse(idxFut.isDone());
+ assertFalse(dropFut.isDone());
// Unblock indexing and see what happens.
unblockIndexing(srv1);
- try {
- idxFut.get();
-
- fail("Exception has not been thrown.");
- }
- catch (SchemaOperationException e) {
- // No-op.
- }
+ idxFut.get();
+ dropFut.get();
}
/**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
index 5001296..f6efc85 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
@@ -49,7 +49,9 @@ import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
@@ -466,20 +468,20 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde
idxLatch.await();
- // Destroy cache (drop table).
- destroySqlCache(cli);
+ // Start destroy cache (drop table).
+ IgniteInternalFuture<Boolean> desFut = destroySqlCacheFuture(cli);
+
+ U.sleep(2_000);
+
+ assertFalse(idxFut.isDone());
+ assertFalse(desFut.isDone());
// Unblock indexing and see what happens.
unblockIndexing(srv1);
- try {
- idxFut.get();
+ GridTestUtils.assertThrows(log, (Callable<?>)idxFut::get, SchemaOperationException.class, null);
- fail("Exception has not been thrown.");
- }
- catch (SchemaOperationException e) {
- // No-op.
- }
+ assertTrue(desFut.get());
}
/**