You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/06/19 00:10:09 UTC

[kafka] branch trunk updated: MINOR: Add reset to SnapshotRegistry and Revertable (#10891)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c333bfd  MINOR: Add reset to SnapshotRegistry and Revertable (#10891)
c333bfd is described below

commit c333bfd41766bf33f41b5d32d8959ebbeff240b4
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Fri Jun 18 17:08:54 2021 -0700

    MINOR: Add reset to SnapshotRegistry and Revertable (#10891)
    
    Add reset functionality to SnapshotRegitry and Revertable, so that we can
    clear the current state before loading a snapshot.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../kafka/controller/ProducerIdControlManager.java |  2 +-
 .../java/org/apache/kafka/timeline/Revertable.java |  5 +++++
 .../apache/kafka/timeline/SnapshotRegistry.java    | 23 ++++++++++++++++++++++
 .../kafka/timeline/SnapshottableHashTable.java     | 10 ++++++++++
 .../org/apache/kafka/timeline/TimelineHashMap.java |  6 +-----
 .../org/apache/kafka/timeline/TimelineHashSet.java |  7 +------
 .../org/apache/kafka/timeline/TimelineInteger.java | 13 ++++++++++--
 .../org/apache/kafka/timeline/TimelineLong.java    | 17 ++++++++++------
 .../kafka/timeline/SnapshottableHashTableTest.java | 21 ++++++++++++++++++++
 .../apache/kafka/timeline/TimelineIntegerTest.java | 17 ++++++++++++++++
 .../apache/kafka/timeline/TimelineLongTest.java    | 17 ++++++++++++++++
 11 files changed, 118 insertions(+), 20 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
index 924605c..7291f93 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
@@ -37,7 +37,7 @@ public class ProducerIdControlManager {
 
     ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) {
         this.clusterControlManager = clusterControlManager;
-        this.lastProducerId = new TimelineLong(snapshotRegistry, 0L);
+        this.lastProducerId = new TimelineLong(snapshotRegistry);
     }
 
     ControllerResult<ProducerIdsBlock> generateNextProducerId(int brokerId, long brokerEpoch) {
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/Revertable.java b/metadata/src/main/java/org/apache/kafka/timeline/Revertable.java
index f7ead35..43eb117 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/Revertable.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/Revertable.java
@@ -29,4 +29,9 @@ interface Revertable {
      * @param delta         The delta associated with this epoch for this object.
      */
     void executeRevert(long targetEpoch, Delta delta);
+
+    /**
+     * Reverts to the initial value.
+     */
+    void reset();
 }
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
index 245014f..b34acee 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
@@ -105,6 +105,11 @@ public class SnapshotRegistry {
      */
     private final Snapshot head = new Snapshot(Long.MIN_VALUE);
 
+    /**
+     * Collection of all Revertable registered with this registry
+     */
+    private final List<Revertable> revertables = new ArrayList<>();
+
     public SnapshotRegistry(LogContext logContext) {
         this.log = logContext.logger(SnapshotRegistry.class);
     }
@@ -254,4 +259,22 @@ public class SnapshotRegistry {
     public long latestEpoch() {
         return head.prev().epoch();
     }
+
+    /**
+     * Associate with this registry.
+     */
+    public void register(Revertable revertable) {
+        revertables.add(revertable);
+    }
+
+    /**
+     * Delete all snapshots and resets all of the Revertable object registered.
+     */
+    public void reset() {
+        deleteSnapshotsUpTo(LATEST_EPOCH);
+
+        for (Revertable revertable : revertables) {
+            revertable.reset();
+        }
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
index 2f5d7be..cbd0a28 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
@@ -280,6 +280,7 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
     SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int expectedSize) {
         super(expectedSize);
         this.snapshotRegistry = snapshotRegistry;
+        snapshotRegistry.register(this);
     }
 
     int snapshottableSize(long epoch) {
@@ -452,4 +453,13 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
             }
         }
     }
+
+    @Override
+    public void reset() {
+        Iterator<T> iter = snapshottableIterator(SnapshottableHashTable.LATEST_EPOCH);
+        while (iter.hasNext()) {
+            iter.next();
+            iter.remove();
+        }
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
index 6e02517..855e7ed 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
@@ -179,11 +179,7 @@ public class TimelineHashMap<K, V>
 
     @Override
     public void clear() {
-        Iterator<TimelineHashMapEntry<K, V>> iter = snapshottableIterator(SnapshottableHashTable.LATEST_EPOCH);
-        while (iter.hasNext()) {
-            iter.next();
-            iter.remove();
-        }
+        reset();
     }
 
     final class KeySet extends AbstractSet<K> {
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java
index 73ac0e4..34efb10 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java
@@ -225,12 +225,7 @@ public class TimelineHashSet<T>
 
     @Override
     public void clear() {
-        Iterator<TimelineHashSetEntry<T>> iter =
-            snapshottableIterator(SnapshottableHashTable.LATEST_EPOCH);
-        while (iter.hasNext()) {
-            iter.next();
-            iter.remove();
-        }
+        reset();
     }
 
     @Override
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineInteger.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
index d28db49..d158890 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
@@ -26,8 +26,10 @@ import java.util.Iterator;
  * This class requires external synchronization.
  */
 public class TimelineInteger implements Revertable {
+    public static final int INIT = 0;
+
     static class IntegerContainer implements Delta {
-        private int value = 0;
+        private int value = INIT;
 
         int value() {
             return value;
@@ -48,7 +50,9 @@ public class TimelineInteger implements Revertable {
 
     public TimelineInteger(SnapshotRegistry snapshotRegistry) {
         this.snapshotRegistry = snapshotRegistry;
-        this.value = 0;
+        this.value = INIT;
+
+        snapshotRegistry.register(this);
     }
 
     public int get() {
@@ -96,6 +100,11 @@ public class TimelineInteger implements Revertable {
     }
 
     @Override
+    public void reset() {
+        set(INIT);
+    }
+
+    @Override
     public int hashCode() {
         return value;
     }
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java
index 36a300f..9b401db 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java
@@ -26,8 +26,10 @@ import java.util.Iterator;
  * This class requires external synchronization.
  */
 public class TimelineLong implements Revertable {
+    public static final long INIT = 0;
+
     static class LongContainer implements Delta {
-        private long value = 0;
+        private long value = INIT;
 
         long value() {
             return value;
@@ -47,12 +49,10 @@ public class TimelineLong implements Revertable {
     private long value;
 
     public TimelineLong(SnapshotRegistry snapshotRegistry) {
-        this(snapshotRegistry, 0L);
-    }
-
-    public TimelineLong(SnapshotRegistry snapshotRegistry, long value) {
         this.snapshotRegistry = snapshotRegistry;
-        this.value = value;
+        this.value = INIT;
+
+        snapshotRegistry.register(this);
     }
 
     public long get() {
@@ -100,6 +100,11 @@ public class TimelineLong implements Revertable {
     }
 
     @Override
+    public void reset() {
+        set(INIT);
+    }
+
+    @Override
     public int hashCode() {
         return ((int) value) ^ (int) (value >>> 32);
     }
diff --git a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
index ef9405cc..972ff58 100644
--- a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
+++ b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.timeline;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -207,6 +208,26 @@ public class SnapshottableHashTableTest {
         assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE), E_1A, E_2A, E_3A);
     }
 
+    @Test
+    public void testReset() {
+        SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
+        SnapshottableHashTable<TestElement> table =
+            new SnapshottableHashTable<>(registry, 1);
+        assertEquals(null, table.snapshottableAddOrReplace(E_1A));
+        assertEquals(null, table.snapshottableAddOrReplace(E_2A));
+        assertEquals(null, table.snapshottableAddOrReplace(E_3A));
+        registry.createSnapshot(0);
+        assertEquals(E_1A, table.snapshottableAddOrReplace(E_1B));
+        assertEquals(E_3A, table.snapshottableAddOrReplace(E_3B));
+        registry.createSnapshot(1);
+
+        registry.reset();
+
+        assertEquals(Collections.emptyList(), registry.epochsList());
+        // Check that the table is empty
+        assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE));
+    }
+
     /**
      * Assert that the given iterator contains the given elements, in any order.
      * We compare using reference equality here, rather than object equality.
diff --git a/metadata/src/test/java/org/apache/kafka/timeline/TimelineIntegerTest.java b/metadata/src/test/java/org/apache/kafka/timeline/TimelineIntegerTest.java
index c2a84c6..13a5d35 100644
--- a/metadata/src/test/java/org/apache/kafka/timeline/TimelineIntegerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/timeline/TimelineIntegerTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.timeline;
 
+import java.util.Collections;
+
 import org.apache.kafka.common.utils.LogContext;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -70,4 +72,19 @@ public class TimelineIntegerTest {
         registry.revertToSnapshot(2);
         assertEquals(0, integer.get());
     }
+
+    @Test
+    public void testReset() {
+        SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
+        TimelineInteger value = new TimelineInteger(registry);
+        registry.createSnapshot(2);
+        value.set(1);
+        registry.createSnapshot(3);
+        value.set(2);
+
+        registry.reset();
+
+        assertEquals(Collections.emptyList(), registry.epochsList());
+        assertEquals(TimelineInteger.INIT, value.get());
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/timeline/TimelineLongTest.java b/metadata/src/test/java/org/apache/kafka/timeline/TimelineLongTest.java
index 378c6c6..10ce566 100644
--- a/metadata/src/test/java/org/apache/kafka/timeline/TimelineLongTest.java
+++ b/metadata/src/test/java/org/apache/kafka/timeline/TimelineLongTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.timeline;
 
+import java.util.Collections;
+
 import org.apache.kafka.common.utils.LogContext;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -70,4 +72,19 @@ public class TimelineLongTest {
         registry.revertToSnapshot(2);
         assertEquals(0L, value.get());
     }
+
+    @Test
+    public void testReset() {
+        SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
+        TimelineLong value = new TimelineLong(registry);
+        registry.createSnapshot(2);
+        value.set(1L);
+        registry.createSnapshot(3);
+        value.set(2L);
+
+        registry.reset();
+
+        assertEquals(Collections.emptyList(), registry.epochsList());
+        assertEquals(TimelineLong.INIT, value.get());
+    }
 }