You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/10/24 16:48:39 UTC

[ignite] branch ignite-17964 created (now 05db73defe6)

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a change to branch ignite-17964
in repository https://gitbox.apache.org/repos/asf/ignite.git


      at 05db73defe6 Fix the issue.

This branch includes the following new commits:

     new 2c294d20122 Add a test reproducing the issue.
     new 05db73defe6 Fix the issue.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite] 02/02: Fix the issue.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-17964
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 05db73defe67187370a036ac4e445130bcfefab4
Author: amashenkov <an...@gmail.com>
AuthorDate: Mon Oct 24 19:48:18 2022 +0300

    Fix the issue.
---
 .../stat/IgniteStatisticsConfigurationManager.java | 116 ++++++++++++++++-----
 1 file changed, 89 insertions(+), 27 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
index b4cdbc38e07..0db8c9e7d36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
@@ -57,8 +58,12 @@ import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnC
 import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
 import org.apache.ignite.internal.processors.query.stat.view.ColumnConfigurationViewSupplier;
 import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 
@@ -132,7 +137,15 @@ public class IgniteStatisticsConfigurationManager {
                         for (Consumer<StatisticsObjectConfiguration> subscriber : subscribers)
                             subscriber.accept(newStatCfg);
 
-                        mgmtBusyExecutor.execute(() -> updateLocalStatistics(newStatCfg));
+                        mgmtBusyExecutor.execute(() -> {
+                            try {
+                                while (!updateLocalStatisticsAsync((StatisticsObjectConfiguration)newV).get())
+                                    ; // No-op
+                            }
+                            catch (IgniteCheckedException e) {
+                                log.warning("Unexpected error during statistics collection: " + e.getMessage(), e);
+                            }
+                        });
                     }
                 );
             }
@@ -266,7 +279,7 @@ public class IgniteStatisticsConfigurationManager {
      *
      * @param cfg Statistics object configuration to update statistics by.
      */
-    private void updateLocalStatistics(StatisticsObjectConfiguration cfg) {
+    private IgniteInternalFuture<Boolean> updateLocalStatisticsAsync(StatisticsObjectConfiguration cfg) {
         TableDescriptor tbl = schemaMgr.table(cfg.key().schema(), cfg.key().obj());
         GridQueryTypeDescriptor typeDesc = tbl != null ? tbl.type() : null;
         GridCacheContextInfo<?, ?> cacheInfo = tbl != null ? tbl.cacheInfo() : null;
@@ -291,17 +304,17 @@ public class IgniteStatisticsConfigurationManager {
                 if (log.isDebugEnabled())
                     log.debug("Removing config for non existing object " + cfg.key());
 
-                dropStatistics(Collections.singletonList(new StatisticsTarget(cfg.key())), false);
+                return dropStatisticsAsync(Collections.singletonList(new StatisticsTarget(cfg.key())), false);
             }
 
-            return;
+            return new GridFinishedFuture<>(true);
         }
 
         if (cctx == null || !cctx.gate().enterIfNotStopped()) {
             if (log.isDebugEnabled())
                 log.debug("Unable to lock table by key " + cfg.key() + ". Skipping statistics collection.");
 
-            return;
+            return new GridFinishedFuture<>(true);
         }
 
         try {
@@ -320,6 +333,8 @@ public class IgniteStatisticsConfigurationManager {
         finally {
             cctx.gate().leave();
         }
+
+        return new GridFinishedFuture<>(true);
     }
 
     /**
@@ -358,10 +373,19 @@ public class IgniteStatisticsConfigurationManager {
      */
     public void updateAllLocalStatistics() {
         try {
+            GridCompoundFuture<Boolean, Boolean> compoundFuture = new GridCompoundFuture<>(CU.boolReducer());
+
             distrMetaStorage.iterate(STAT_OBJ_PREFIX, (k, v) -> {
                 StatisticsObjectConfiguration cfg = (StatisticsObjectConfiguration)v;
 
-                updateLocalStatistics(cfg);
+                compoundFuture.add(updateLocalStatisticsAsync(cfg));
+            });
+
+            compoundFuture.markInitialized();
+
+            compoundFuture.listen(future -> {
+                if (future.error() == null && !future.result())
+                    mgmtBusyExecutor.execute(this::updateAllLocalStatistics);
             });
         }
         catch (IgniteCheckedException e) {
@@ -438,38 +462,76 @@ public class IgniteStatisticsConfigurationManager {
      * @param validate if {@code true} - validate statistics existence, otherwise - just try to remove.
      */
     public void dropStatistics(List<StatisticsTarget> targets, boolean validate) {
+        try {
+            while (!dropStatisticsAsync(targets, validate).get())
+                ; // No-op
+        }
+        catch (IgniteCheckedException ex) {
+            if (ex.getCause() instanceof IgniteSQLException)
+                throw (IgniteSQLException)ex.getCause();
+
+            throw new IgniteSQLException("Error occurs while updating statistics schema",
+                    IgniteQueryErrorCode.UNKNOWN, ex);
+        }
+    }
+
+    /**
+     * Drop local statistic for specified database objects on the cluster.
+     * Remove local aggregated and partitioned statistics that are stored at the local metastorage.
+     *
+     * @param targets  DB objects to update statistics by.
+     * @param validate if {@code true} - validate statistics existence, otherwise - just try to remove.
+     */
+    public IgniteInternalFuture<Boolean> dropStatisticsAsync(List<StatisticsTarget> targets, boolean validate) {
         if (log.isDebugEnabled())
             log.debug("Drop statistics [targets=" + targets + ']');
 
+        GridFutureAdapter<Boolean> resultFuture = new GridFutureAdapter<>();
+        IgniteInternalFuture<Boolean> chainFuture = new GridFinishedFuture<>(true);
+
         for (StatisticsTarget target : targets) {
-            String key = key2String(target.key());
+            chainFuture = chainFuture.chainCompose(f -> {
+                if (f.error() == null && f.result() == Boolean.TRUE)
+                    return removeFromMetastore(target, validate);
 
-            try {
-                while (true) {
-                    StatisticsObjectConfiguration oldCfg = distrMetaStorage.read(key);
+                return f;
+            });
+        }
 
-                    if (validate)
-                        validateDropRefresh(target, oldCfg);
+        chainFuture.listen(f -> {
+            if (f.error() != null)
+                resultFuture.onDone(f.error());
+            else
+                resultFuture.onDone(f.result() == null || f.result().booleanValue());
+        });
 
-                    if (oldCfg == null)
-                        return;
+        return resultFuture;
+    }
 
-                    Set<String> dropColNames = (target.columns() == null) ? Collections.emptySet() :
-                        Arrays.stream(target.columns()).collect(Collectors.toSet());
+    private IgniteInternalFuture<Boolean> removeFromMetastore(StatisticsTarget target, boolean validate) {
+        String key = key2String(target.key());
 
-                    StatisticsObjectConfiguration newCfg = oldCfg.dropColumns(dropColNames);
+        try {
+            StatisticsObjectConfiguration oldCfg = distrMetaStorage.read(key);
 
-                    if (oldCfg.equals(newCfg))
-                        break;
+            if (validate)
+                validateDropRefresh(target, oldCfg);
 
-                    if (distrMetaStorage.compareAndSet(key, oldCfg, newCfg))
-                        break;
-                }
-            }
-            catch (IgniteCheckedException ex) {
-                throw new IgniteSQLException(
-                    "Error on get or update statistic schema", IgniteQueryErrorCode.UNKNOWN, ex);
-            }
+            if (oldCfg == null)
+                return new GridFinishedFuture<>(null); //Stop future chaining. Other thread\node makes the progress.
+
+            Set<String> dropColNames = (target.columns() == null) ? Collections.emptySet() :
+                                               Arrays.stream(target.columns()).collect(Collectors.toSet());
+
+            StatisticsObjectConfiguration newCfg = oldCfg.dropColumns(dropColNames);
+
+            if (oldCfg.equals(newCfg))
+                return new GridFinishedFuture<>(true); //Skip. Nothing to do.
+
+            return distrMetaStorage.compareAndSetAsync(key, oldCfg, newCfg);
+        }
+        catch (Throwable ex) {
+            return new GridFinishedFuture<>(ex);
         }
     }
 


[ignite] 01/02: Add a test reproducing the issue.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-17964
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 2c294d201220c0b34c255919aa493e6bc3647f41
Author: amashenkov <an...@gmail.com>
AuthorDate: Mon Oct 24 19:47:55 2022 +0300

    Add a test reproducing the issue.
---
 .../query/stat/StatisticsConfigurationTest.java    | 44 ++++++++++++++++++++++
 1 file changed, 44 insertions(+)

diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java
index b814de5a51b..eced0606697 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java
@@ -394,6 +394,50 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest {
             (stats) -> stats.forEach(s -> assertNull("Invalid stats: " + stats, s.columnStatistics("A"))));
     }
 
+    /**
+     * Checks orphan records cleanup on activation doesn't lead to grid hanging.
+     * - Start the grid, create table and collect statistics.
+     * - Ensure statistics for the table exists.
+     * - Disable StatisticsManagerConfiguration to prevent configuration changes in metastorage.
+     * - Drop table.
+     * - Re-activate the grid.
+     * - Ensures statistics for the table was dropped as well.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testOrphanDataCleanup() throws Exception {
+        startGrids(2);
+
+        grid(0).cluster().state(ClusterState.ACTIVE);
+
+        createSmallTable(null);
+
+        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
+
+        waitForStats(SCHEMA, "SMALL", TIMEOUT, checkTotalRows, checkColumStats);
+
+        // Stop StatisticsConfigurationManager with all it's listeners, so metastorage won't be updated.
+        statisticsMgr(0).statisticConfiguration().stop();
+        statisticsMgr(1).statisticConfiguration().stop();
+
+        dropSmallTable(null);
+
+        waitForStats(SCHEMA, "SMALL", TIMEOUT, (stats) -> stats.forEach(s -> assertNotNull(s)));
+
+        checkStatisticsInMetastore(grid(0).context().cache().context().database(), TIMEOUT,
+                SCHEMA, "SMALL", (s -> assertNotNull(s.data().get("A"))));
+
+        // Restarts StatisticsConfigurationManager and trigger cleanup of orphan record in metastorage.
+        grid(0).cluster().state(ClusterState.INACTIVE);
+        grid(0).cluster().state(ClusterState.ACTIVE);
+
+        waitForStats(SCHEMA, "SMALL", TIMEOUT, (stats) -> stats.forEach(s -> assertNull(s)));
+
+        checkStatisticsInMetastore(grid(0).context().cache().context().database(), TIMEOUT,
+                SCHEMA, "SMALL", (s -> assertNull(s.data().get("A"))));
+    }
+
     /**
      * Check drop statistics when table is dropped.
      */