You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "rpuch (via GitHub)" <gi...@apache.org> on 2023/04/20 11:25:48 UTC

[GitHub] [ignite-3] rpuch commented on a diff in pull request #1960: IGNITE-19326 Close partition safe time trackers

rpuch commented on code in PR #1960:
URL: https://github.com/apache/ignite-3/pull/1960#discussion_r1172419950


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -101,8 +103,8 @@ public class InternalTableImpl implements InternalTable {
     /** Number of attempts. */
     private static final int ATTEMPTS_TO_ENLIST_PARTITION = 5;
 
-    /** Partition map. */
-    protected volatile Int2ObjectMap<RaftGroupService> partitionMap;
+    /** Map update guard by field {@link #updatePartitionMapsMux}. */

Review Comment:
   ```suggestion
       /** Map update guarded by field {@link #updatePartitionMapsMux}. */
   ```



##########
modules/core/src/main/java/org/apache/ignite/internal/util/TrackerClosedException.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+/**
+ * Exception that will be thrown when the {@link PendingComparableValuesTracker} is closed.
+ */
+public class TrackerClosedException extends RuntimeException {

Review Comment:
   Shouldn't it extend `IgniteException`?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -725,8 +725,11 @@ private CompletableFuture<?> updateAssignmentInternal(ConfigurationNotificationE
 
                 placementDriver.updateAssignment(replicaGrpId, newConfiguration.peers().stream().map(Peer::consistentId).collect(toList()));
 
-                PendingComparableValuesTracker<HybridTimestamp> safeTime = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
-                PendingComparableValuesTracker<Long> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
+                var safeTimeTracker = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
+                var storageIndexTracker = new PendingComparableValuesTracker<>(0L);
+
+                ((InternalTableImpl) internalTbl).updatePartitionSafeTimeTracker(partId, safeTimeTracker);

Review Comment:
   Why is this cast necessary? Isn't it possible to add the methods to the interface?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1054,6 +1061,16 @@ private void cleanUpTablesResources(Map<UUID, TableImpl> tables) {
 
                 CompletableFuture<Void> removeFromGcFuture = mvGc.removeStorage(replicationGroupId);
 
+                stopping.add(() -> {
+                    try {
+                        closeTracker(internalTable.getPartitionSafeTimeTracker(partitionId));
+
+                        closeTracker(internalTable.getPartitionStorageIndexTracker(partitionId));

Review Comment:
   Let's extract these lines to a method, they are duplicated a few times



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1470,4 +1478,62 @@ private RuntimeException wrapReplicationException(Throwable e) {
 
         return e0;
     }
+
+    @Override
+    public @Nullable PendingComparableValuesTracker<HybridTimestamp> getPartitionSafeTimeTracker(int partitionId) {
+        return safeTimeTrackerByPartitionId.get(partitionId);
+    }
+
+    @Override
+    public @Nullable PendingComparableValuesTracker<Long> getPartitionStorageIndexTracker(int partitionId) {
+        return storageIndexTrackerByPartitionId.get(partitionId);
+    }
+
+    /**
+     * Updates the partition safe time tracker, if there was a previous one, closes it.
+     *
+     * @param partitionId Partition ID.
+     * @param newSafeTimeTracker New partition safe time tracker.
+     */
+    public void updatePartitionSafeTimeTracker(int partitionId, PendingComparableValuesTracker<HybridTimestamp> newSafeTimeTracker) {
+        PendingComparableValuesTracker<HybridTimestamp> previousSafeTimeTracker;
+
+        synchronized (updatePartitionMapsMux) {
+            Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp>> newSafeTimeTrackerMap = new Int2ObjectOpenHashMap<>(partitions);
+
+            newSafeTimeTrackerMap.putAll(safeTimeTrackerByPartitionId);
+
+            previousSafeTimeTracker = newSafeTimeTrackerMap.put(partitionId, newSafeTimeTracker);
+
+            safeTimeTrackerByPartitionId = newSafeTimeTrackerMap;
+        }
+
+        if (previousSafeTimeTracker != null) {
+            previousSafeTimeTracker.close();

Review Comment:
   Do we have a test that makes sure that replaced trackers are closed?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -465,4 +466,15 @@ void handleBuildIndexCommand(BuildIndexCommand cmd, long commandIndex, long comm
             );
         }
     }
+
+    private static <T extends Comparable<T>> void updateTrackerWithIgnoreTrackerClosedException(

Review Comment:
   I suggest renaming it to `updateTrackerIgnoringTrackerClosedException`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -195,10 +196,10 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
 
                 assert safeTimePropagatingCommand.safeTime() != null;
 
-                safeTime.update(safeTimePropagatingCommand.safeTime().asHybridTimestamp());
+                updateTrackerWithIgnoreTrackerClosedException(safeTime, safeTimePropagatingCommand.safeTime().asHybridTimestamp());
             }
 
-            storageIndexTracker.update(commandIndex);
+            updateTrackerWithIgnoreTrackerClosedException(storageIndexTracker, commandIndex);

Review Comment:
   `Replica#waitForActualState()` waits for next index. It might happen that the `PartitionListener` actually executes the command and updates the index in the storage, but then fails to update the tracker (and notify the subscribers, including `Replica#waitForActualState()`). Would it be handled correctly in `Replica#waitForActualState()`? Maybe we need to call someone from the TX team.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -725,8 +725,11 @@ private CompletableFuture<?> updateAssignmentInternal(ConfigurationNotificationE
 
                 placementDriver.updateAssignment(replicaGrpId, newConfiguration.peers().stream().map(Peer::consistentId).collect(toList()));
 
-                PendingComparableValuesTracker<HybridTimestamp> safeTime = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
-                PendingComparableValuesTracker<Long> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
+                var safeTimeTracker = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
+                var storageIndexTracker = new PendingComparableValuesTracker<>(0L);
+
+                ((InternalTableImpl) internalTbl).updatePartitionSafeTimeTracker(partId, safeTimeTracker);
+                ((InternalTableImpl) internalTbl).updatePartitionStorageIndexTracker(partId, storageIndexTracker);

Review Comment:
   These 2 methods are always called together. Does it make sense to merge them in one?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -128,15 +130,21 @@ public class InternalTableImpl implements InternalTable {
     /** Replica service. */
     private final ReplicaService replicaSvc;
 
-    /** Mutex for the partition map update. */
-    private final Object updatePartMapMux = new Object();
+    /** Mutex for the partition maps update. */
+    private final Object updatePartitionMapsMux = new Object();
 
     /** Table messages factory. */
     private final TableMessagesFactory tableMessagesFactory;
 
     /** A hybrid logical clock. */
     private final HybridClock clock;
 
+    /** Map update guard by field {@link #updatePartitionMapsMux}. */
+    private volatile Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp>> safeTimeTrackerByPartitionId = emptyMap();
+
+    /** Map update guard by field {@link #updatePartitionMapsMux}. */
+    private volatile Int2ObjectMap<PendingComparableValuesTracker<Long>> storageIndexTrackerByPartitionId = emptyMap();

Review Comment:
   ```suggestion
       /** Map update guarded by field {@link #updatePartitionMapsMux}. */
       private volatile Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp>> safeTimeTrackerByPartitionId = emptyMap();
   
       /** Map update guarded by field {@link #updatePartitionMapsMux}. */
       private volatile Int2ObjectMap<PendingComparableValuesTracker<Long>> storageIndexTrackerByPartitionId = emptyMap();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org