You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2020/12/11 12:24:12 UTC

[ignite] branch master updated: IGNITE-13101 Metastore should complete all write futures during stop and prohibit creating new ones - Fixes #8554.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a082ea0  IGNITE-13101 Metastore should complete all write futures during stop and prohibit creating new ones - Fixes #8554.
a082ea0 is described below

commit a082ea051ce8c9a923f1c1368e119e3fce25c3ca
Author: ibessonov <be...@gmail.com>
AuthorDate: Fri Dec 11 15:10:26 2020 +0300

    IGNITE-13101 Metastore should complete all write futures during stop and prohibit creating new ones - Fixes #8554.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../persistence/DistributedMetaStorageImpl.java    | 105 +++++++++++++++++----
 .../processors/metric/GridMetricManager.java       |   5 +-
 .../metastorage/DistributedMetaStorageTest.java    |  16 ++++
 3 files changed, 109 insertions(+), 17 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
index 4ffd0ca..880f645 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
@@ -28,6 +28,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiConsumer;
 import java.util.function.Predicate;
@@ -41,6 +42,7 @@ import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -59,12 +61,14 @@ import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageL
 import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
@@ -175,6 +179,12 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
      */
     private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> updateFuts = new ConcurrentHashMap<>();
 
+    /** */
+    private final ReadWriteLock updateFutsStopLock = new ReentrantReadWriteLock();
+
+    /** */
+    private boolean stopped;
+
     /**
      * Lock to access/update data and component's state.
      */
@@ -287,7 +297,7 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
         finally {
             lock.writeLock().unlock();
 
-            cancelUpdateFutures();
+            cancelUpdateFutures(nodeStoppingException(), true);
         }
     }
 
@@ -914,7 +924,7 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
 
             ver = INITIAL_VERSION;
 
-            cancelUpdateFutures();
+            cancelUpdateFutures(new IgniteCheckedException("Client was disconnected during the operation."), false);
         }
         finally {
             lock.writeLock().unlock();
@@ -924,13 +934,28 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
     /**
      * Cancel all waiting futures and clear the map.
      */
-    private void cancelUpdateFutures() {
-        for (GridFutureAdapter<Boolean> fut : updateFuts.values())
-            fut.onDone(new IgniteCheckedException("Client was disconnected during the operation."));
+    private void cancelUpdateFutures(Exception e, boolean stop) {
+        updateFutsStopLock.writeLock().lock();
+
+        try {
+            stopped = stop;
+
+            for (GridFutureAdapter<Boolean> fut : updateFuts.values())
+                fut.onDone(e);
 
-        updateFuts.clear();
+            updateFuts.clear();
+        }
+        finally {
+            updateFutsStopLock.writeLock().unlock();
+        }
+    }
+
+    /** */
+    private static NodeStoppingException nodeStoppingException() {
+        return new NodeStoppingException("Node is stopping.");
     }
 
+
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) {
         assert isClient;
@@ -1033,14 +1058,12 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
      * @throws IgniteCheckedException If there was an error while sending discovery message.
      */
     private GridFutureAdapter<?> startWrite(String key, byte[] valBytes) throws IgniteCheckedException {
-       if (!isSupported(ctx))
-            throw new IgniteCheckedException(NOT_SUPPORTED_MSG);
-
         UUID reqId = UUID.randomUUID();
 
-        GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
+        GridFutureAdapter<?> fut = prepareWriteFuture(key, reqId);
 
-        updateFuts.put(reqId, fut);
+        if (fut.isDone())
+            return fut;
 
         DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes);
 
@@ -1054,14 +1077,12 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
      */
     private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte[] newValBytes)
         throws IgniteCheckedException {
-         if (!isSupported(ctx))
-            throw new IgniteCheckedException(NOT_SUPPORTED_MSG);
-
         UUID reqId = UUID.randomUUID();
 
-        GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
+        GridFutureAdapter<Boolean> fut = prepareWriteFuture(key, reqId);
 
-        updateFuts.put(reqId, fut);
+        if (fut.isDone())
+            return fut;
 
         DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes);
 
@@ -1071,6 +1092,58 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
     }
 
     /**
+     * This method will perform some preliminary checks before starting write or cas operation.
+     * It also updates {@link #updateFuts} in case if everything's ok.
+     *
+     * Tricky part is exception handling from "isSupported" method. It can be thrown by
+     * {@code ZookeeperDiscoveryImpl#checkState()} method, but we can't just leave it as is.
+     * There are components that rely on distributed metastorage throwing {@link NodeStoppingException}.
+     *
+     * @return Future that must be returned immediately or {@code null}.
+     * @throws IgniteCheckedException If cluster can't perform this update.
+     */
+    private GridFutureAdapter<Boolean> prepareWriteFuture(String key, UUID reqId) throws IgniteCheckedException {
+        boolean supported;
+
+        try {
+            supported = isSupported(ctx);
+        }
+        catch (Exception e) {
+            if (X.hasCause(e, IgniteSpiException.class) && e.getMessage() != null && e.getMessage().contains("Node stopped.")) {
+                GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
+
+                fut.onDone(nodeStoppingException());
+
+                return fut;
+            }
+
+            throw e;
+        }
+
+        if (!supported)
+            throw new IgniteCheckedException(NOT_SUPPORTED_MSG);
+
+        GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
+
+        updateFutsStopLock.readLock().lock();
+
+        try {
+            if (stopped) {
+                fut.onDone(nodeStoppingException());
+
+                return fut;
+            }
+
+            updateFuts.put(reqId, fut);
+        }
+        finally {
+            updateFutsStopLock.readLock().unlock();
+        }
+
+        return fut;
+    }
+
+    /**
      * Invoked when {@link DistributedMetaStorageUpdateMessage} received. Attempts to store received data (depends on
      * current {@link #bridge} value). Invokes failure handler with critical error if attempt failed for some reason.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
index fddf7ff..267b2a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
@@ -439,8 +439,11 @@ public class GridMetricManager extends GridManagerAdapter<MetricExporterSpi> imp
             opsFut.markInitialized();
             opsFut.get();
         }
+        catch (NodeStoppingException ignored) {
+            // No-op.
+        }
         catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
+            log.error("Failed to remove metrics configuration.", e);
         }
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
index 763a806..298e202 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -31,7 +32,9 @@ import org.apache.ignite.failure.FailureHandler;
 import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -133,6 +136,19 @@ public class DistributedMetaStorageTest extends GridCommonAbstractTest {
         metastorage.remove("key");
 
         assertNull(metastorage.read("key"));
+
+        stopGrid(0);
+
+        try {
+            metastorage.writeAsync("key", "value").get(10, TimeUnit.SECONDS);
+
+            fail("Exception is expected");
+        }
+        catch (Exception e) {
+            assertTrue(X.hasCause(e, NodeStoppingException.class));
+
+            assertTrue(e.getMessage().contains("Node is stopping."));
+        }
     }
 
     /**