You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2020/12/11 12:24:12 UTC
[ignite] branch master updated: IGNITE-13101 Metastore should
complete all write futures during stop and prohibit creating new ones -
Fixes #8554.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a082ea0 IGNITE-13101 Metastore should complete all write futures during stop and prohibit creating new ones - Fixes #8554.
a082ea0 is described below
commit a082ea051ce8c9a923f1c1368e119e3fce25c3ca
Author: ibessonov <be...@gmail.com>
AuthorDate: Fri Dec 11 15:10:26 2020 +0300
IGNITE-13101 Metastore should complete all write futures during stop and prohibit creating new ones - Fixes #8554.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../persistence/DistributedMetaStorageImpl.java | 105 +++++++++++++++++----
.../processors/metric/GridMetricManager.java | 5 +-
.../metastorage/DistributedMetaStorageTest.java | 16 ++++
3 files changed, 109 insertions(+), 17 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
index 4ffd0ca..880f645 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
@@ -41,6 +42,7 @@ import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -59,12 +61,14 @@ import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageL
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
@@ -175,6 +179,12 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
*/
private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> updateFuts = new ConcurrentHashMap<>();
+ /** */
+ private final ReadWriteLock updateFutsStopLock = new ReentrantReadWriteLock();
+
+ /** */
+ private boolean stopped;
+
/**
* Lock to access/update data and component's state.
*/
@@ -287,7 +297,7 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
finally {
lock.writeLock().unlock();
- cancelUpdateFutures();
+ cancelUpdateFutures(nodeStoppingException(), true);
}
}
@@ -914,7 +924,7 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
ver = INITIAL_VERSION;
- cancelUpdateFutures();
+ cancelUpdateFutures(new IgniteCheckedException("Client was disconnected during the operation."), false);
}
finally {
lock.writeLock().unlock();
@@ -924,13 +934,28 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
/**
* Cancel all waiting futures and clear the map.
*/
- private void cancelUpdateFutures() {
- for (GridFutureAdapter<Boolean> fut : updateFuts.values())
- fut.onDone(new IgniteCheckedException("Client was disconnected during the operation."));
+ private void cancelUpdateFutures(Exception e, boolean stop) {
+ updateFutsStopLock.writeLock().lock();
+
+ try {
+ stopped = stop;
+
+ for (GridFutureAdapter<Boolean> fut : updateFuts.values())
+ fut.onDone(e);
- updateFuts.clear();
+ updateFuts.clear();
+ }
+ finally {
+ updateFutsStopLock.writeLock().unlock();
+ }
+ }
+
+ /** */
+ private static NodeStoppingException nodeStoppingException() {
+ return new NodeStoppingException("Node is stopping.");
}
+
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) {
assert isClient;
@@ -1033,14 +1058,12 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
* @throws IgniteCheckedException If there was an error while sending discovery message.
*/
private GridFutureAdapter<?> startWrite(String key, byte[] valBytes) throws IgniteCheckedException {
- if (!isSupported(ctx))
- throw new IgniteCheckedException(NOT_SUPPORTED_MSG);
-
UUID reqId = UUID.randomUUID();
- GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
+ GridFutureAdapter<?> fut = prepareWriteFuture(key, reqId);
- updateFuts.put(reqId, fut);
+ if (fut.isDone())
+ return fut;
DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes);
@@ -1054,14 +1077,12 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
*/
private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte[] newValBytes)
throws IgniteCheckedException {
- if (!isSupported(ctx))
- throw new IgniteCheckedException(NOT_SUPPORTED_MSG);
-
UUID reqId = UUID.randomUUID();
- GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
+ GridFutureAdapter<Boolean> fut = prepareWriteFuture(key, reqId);
- updateFuts.put(reqId, fut);
+ if (fut.isDone())
+ return fut;
DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes);
@@ -1071,6 +1092,58 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
}
/**
+ * This method will perform some preliminary checks before starting write or cas operation.
+ * It also updates {@link #updateFuts} in case if everything's ok.
+ *
+ * Tricky part is exception handling from "isSupported" method. It can be thrown by
+ * {@code ZookeeperDiscoveryImpl#checkState()} method, but we can't just leave it as is.
+ * There are components that rely on distributed metastorage throwing {@link NodeStoppingException}.
+ *
+ * @return Future that must be returned immediately or {@code null}.
+ * @throws IgniteCheckedException If cluster can't perform this update.
+ */
+ private GridFutureAdapter<Boolean> prepareWriteFuture(String key, UUID reqId) throws IgniteCheckedException {
+ boolean supported;
+
+ try {
+ supported = isSupported(ctx);
+ }
+ catch (Exception e) {
+ if (X.hasCause(e, IgniteSpiException.class) && e.getMessage() != null && e.getMessage().contains("Node stopped.")) {
+ GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
+
+ fut.onDone(nodeStoppingException());
+
+ return fut;
+ }
+
+ throw e;
+ }
+
+ if (!supported)
+ throw new IgniteCheckedException(NOT_SUPPORTED_MSG);
+
+ GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
+
+ updateFutsStopLock.readLock().lock();
+
+ try {
+ if (stopped) {
+ fut.onDone(nodeStoppingException());
+
+ return fut;
+ }
+
+ updateFuts.put(reqId, fut);
+ }
+ finally {
+ updateFutsStopLock.readLock().unlock();
+ }
+
+ return fut;
+ }
+
+ /**
* Invoked when {@link DistributedMetaStorageUpdateMessage} received. Attempts to store received data (depends on
* current {@link #bridge} value). Invokes failure handler with critical error if attempt failed for some reason.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
index fddf7ff..267b2a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
@@ -439,8 +439,11 @@ public class GridMetricManager extends GridManagerAdapter<MetricExporterSpi> imp
opsFut.markInitialized();
opsFut.get();
}
+ catch (NodeStoppingException ignored) {
+ // No-op.
+ }
catch (IgniteCheckedException e) {
- throw new IgniteException(e);
+ log.error("Failed to remove metrics configuration.", e);
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
index 763a806..298e202 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -31,7 +32,9 @@ import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -133,6 +136,19 @@ public class DistributedMetaStorageTest extends GridCommonAbstractTest {
metastorage.remove("key");
assertNull(metastorage.read("key"));
+
+ stopGrid(0);
+
+ try {
+ metastorage.writeAsync("key", "value").get(10, TimeUnit.SECONDS);
+
+ fail("Exception is expected");
+ }
+ catch (Exception e) {
+ assertTrue(X.hasCause(e, NodeStoppingException.class));
+
+ assertTrue(e.getMessage().contains("Node is stopping."));
+ }
}
/**