You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2023/07/12 10:38:15 UTC

[ignite-3] branch main updated: IGNITE-18959 Placement driver's local map cleanup on assingments removal (#2292)

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1910afa0b1 IGNITE-18959 Placement driver's local map cleanup on assingments removal (#2292)
1910afa0b1 is described below

commit 1910afa0b1aec6b0f97330532303f237f75db029
Author: Denis Chudov <mo...@gmail.com>
AuthorDate: Wed Jul 12 13:38:09 2023 +0300

    IGNITE-18959 Placement driver's local map cleanup on assingments removal (#2292)
---
 .../ignite/internal/metastorage/MetaStorageManager.java   | 10 ++++++++++
 .../internal/metastorage/impl/MetaStorageManagerImpl.java |  2 ++
 .../internal/placementdriver/AssignmentsTracker.java      |  2 +-
 .../placementdriver/negotiation/LeaseNegotiator.java      | 11 +++++------
 .../java/org/apache/ignite/internal/app/IgniteImpl.java   | 12 ++++++------
 .../ignite/internal/table/distributed/TableManager.java   | 15 +++++++++++++++
 6 files changed, 39 insertions(+), 13 deletions(-)

diff --git a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index fb37f7f1ab..54453fbff0 100644
--- a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -131,6 +131,16 @@ public interface MetaStorageManager extends IgniteComponent {
      */
     CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals);
 
+    /**
+     * Removes an entry for the given key.
+     */
+    CompletableFuture<Void> remove(ByteArray key);
+
+    /**
+     * Removes entries for given keys.
+     */
+    CompletableFuture<Void> removeAll(Set<ByteArray> keys);
+
     /**
      * Retrieves entries for the given key prefix in lexicographic order. Shortcut for {@link #prefix(ByteArray, long)} where
      * {@code revUpperBound = LATEST_REVISION}.
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 10ec576f1a..8d810e6044 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -638,6 +638,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
      *
      * @see MetaStorageService#remove(ByteArray)
      */
+    @Override
     public CompletableFuture<Void> remove(ByteArray key) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(new NodeStoppingException());
@@ -672,6 +673,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
      *
      * @see MetaStorageService#removeAll(Set)
      */
+    @Override
     public CompletableFuture<Void> removeAll(Set<ByteArray> keys) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(new NodeStoppingException());
diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index cc197a44a5..c8a3542421 100644
--- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -144,7 +144,7 @@ public class AssignmentsTracker {
                 var replicationGrpId = TablePartitionId.fromString(
                         new String(evt.newEntry().key(), StandardCharsets.UTF_8).replace(STABLE_ASSIGNMENTS_PREFIX, ""));
 
-                if (evt.newEntry().empty()) {
+                if (evt.newEntry().tombstone()) {
                     groupAssignments.remove(replicationGrpId);
                 } else {
                     Set<Assignment> prevAssignment = groupAssignments.put(replicationGrpId, ByteUtils.fromBytes(evt.newEntry().value()));
diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
index dbd3326fc5..ffcd828fa8 100644
--- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
+++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
@@ -22,6 +22,8 @@ import static org.apache.ignite.internal.placementdriver.negotiation.LeaseAgreem
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.placementdriver.LeaseUpdater;
 import org.apache.ignite.internal.placementdriver.leases.Lease;
 import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
@@ -33,9 +35,8 @@ import org.apache.ignite.network.ClusterService;
  * This class negotiates a lease with leaseholder. If the lease is negotiated, it is ready available to accept.
  */
 public class LeaseNegotiator {
-    // TODO https://issues.apache.org/jira/browse/IGNITE-18959 uncomment
-    ///** The logger. */
-    //private static final IgniteLogger LOG = Loggers.forClass(LeaseNegotiator.class);
+    /** The logger. */
+    private static final IgniteLogger LOG = Loggers.forClass(LeaseNegotiator.class);
 
     private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
 
@@ -83,9 +84,7 @@ public class LeaseNegotiator {
                         leaseInterval)
                 .handle((msg, throwable) -> {
                     if (throwable != null) {
-                        // TODO commented this because of log flooding due to incorrect lease cleanup
-                        // TODO https://issues.apache.org/jira/browse/IGNITE-18959
-                        // LOG.warn("Lease was not negotiated due to exception [lease={}]", throwable, lease);
+                        LOG.warn("Lease was not negotiated due to exception [lease={}]", throwable, lease);
                     } else {
                         assert msg instanceof LeaseGrantedMessageResponse : "Message type is unexpected [type="
                                 + msg.getClass().getSimpleName() + ']';
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index dfd08d83ce..6ba3ece793 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -436,15 +436,17 @@ public class IgniteImpl implements Ignite {
                 distributedConfigurationValidator
         );
 
+        ConfigurationRegistry clusterConfigRegistry = clusterCfgMgr.configurationRegistry();
+
+        TablesConfiguration tablesConfig = clusterConfigRegistry.getConfiguration(TablesConfiguration.KEY);
+
+        DistributionZonesConfiguration zonesConfig = clusterConfigRegistry.getConfiguration(DistributionZonesConfiguration.KEY);
+
         distributedConfigurationUpdater = new DistributedConfigurationUpdater(
                 cmgMgr,
                 new HoconPresentation(clusterCfgMgr.configurationRegistry())
         );
 
-        ConfigurationRegistry clusterConfigRegistry = clusterCfgMgr.configurationRegistry();
-
-        TablesConfiguration tablesConfig = clusterConfigRegistry.getConfiguration(TablesConfiguration.KEY);
-
         metaStorageMgr.configure(clusterConfigRegistry.getConfiguration(MetaStorageConfiguration.KEY));
 
         DistributionZonesConfiguration zonesConfiguration = clusterConfigRegistry.getConfiguration(DistributionZonesConfiguration.KEY);
@@ -481,8 +483,6 @@ public class IgniteImpl implements Ignite {
 
         Path storagePath = getPartitionsStorePath(workDir);
 
-        DistributionZonesConfiguration zonesConfig = clusterConfigRegistry.getConfiguration(DistributionZonesConfiguration.KEY);
-
         GcConfiguration gcConfig = clusterConfigRegistry.getConfiguration(GcConfiguration.KEY);
 
         dataStorageMgr = new DataStorageManager(
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b534b2e831..9e038c48f0 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1427,6 +1427,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
             fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, tableId))
                     .whenComplete((v, e) -> {
+                        Set<ByteArray> assignmentKeys = new HashSet<>();
+
+                        for (int p = 0; p < partitions; p++) {
+                            assignmentKeys.add(stablePartAssignmentsKey(new TablePartitionId(tableId, p)));
+                        }
+
+                        metaStorageMgr.removeAll(assignmentKeys);
+
                         if (e != null) {
                             LOG.error("Error on " + TableEvent.DROP + " notification", e);
                         }
@@ -2531,6 +2539,13 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             return completedFuture(null);
         }
 
+        if (!evt.single()) {
+            // If there is not a single entry, then all entries must be tombstones (this happens after table drop).
+            assert evt.entryEvents().stream().allMatch(entryEvent -> entryEvent.newEntry().tombstone()) : evt;
+
+            return completedFuture(null);
+        }
+
         // here we can receive only update from the rebalance logic
         // these updates always processing only 1 partition, so, only 1 stable partition key.
         assert evt.single() : evt;