You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/06/19 00:10:09 UTC
[kafka] branch trunk updated: MINOR: Add reset to SnapshotRegistry
and Revertable (#10891)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c333bfd MINOR: Add reset to SnapshotRegistry and Revertable (#10891)
c333bfd is described below
commit c333bfd41766bf33f41b5d32d8959ebbeff240b4
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Fri Jun 18 17:08:54 2021 -0700
MINOR: Add reset to SnapshotRegistry and Revertable (#10891)
Add reset functionality to SnapshotRegitry and Revertable, so that we can
clear the current state before loading a snapshot.
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../kafka/controller/ProducerIdControlManager.java | 2 +-
.../java/org/apache/kafka/timeline/Revertable.java | 5 +++++
.../apache/kafka/timeline/SnapshotRegistry.java | 23 ++++++++++++++++++++++
.../kafka/timeline/SnapshottableHashTable.java | 10 ++++++++++
.../org/apache/kafka/timeline/TimelineHashMap.java | 6 +-----
.../org/apache/kafka/timeline/TimelineHashSet.java | 7 +------
.../org/apache/kafka/timeline/TimelineInteger.java | 13 ++++++++++--
.../org/apache/kafka/timeline/TimelineLong.java | 17 ++++++++++------
.../kafka/timeline/SnapshottableHashTableTest.java | 21 ++++++++++++++++++++
.../apache/kafka/timeline/TimelineIntegerTest.java | 17 ++++++++++++++++
.../apache/kafka/timeline/TimelineLongTest.java | 17 ++++++++++++++++
11 files changed, 118 insertions(+), 20 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
index 924605c..7291f93 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
@@ -37,7 +37,7 @@ public class ProducerIdControlManager {
ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) {
this.clusterControlManager = clusterControlManager;
- this.lastProducerId = new TimelineLong(snapshotRegistry, 0L);
+ this.lastProducerId = new TimelineLong(snapshotRegistry);
}
ControllerResult<ProducerIdsBlock> generateNextProducerId(int brokerId, long brokerEpoch) {
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/Revertable.java b/metadata/src/main/java/org/apache/kafka/timeline/Revertable.java
index f7ead35..43eb117 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/Revertable.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/Revertable.java
@@ -29,4 +29,9 @@ interface Revertable {
* @param delta The delta associated with this epoch for this object.
*/
void executeRevert(long targetEpoch, Delta delta);
+
+ /**
+ * Reverts to the initial value.
+ */
+ void reset();
}
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
index 245014f..b34acee 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
@@ -105,6 +105,11 @@ public class SnapshotRegistry {
*/
private final Snapshot head = new Snapshot(Long.MIN_VALUE);
+ /**
+ * Collection of all Revertable registered with this registry
+ */
+ private final List<Revertable> revertables = new ArrayList<>();
+
public SnapshotRegistry(LogContext logContext) {
this.log = logContext.logger(SnapshotRegistry.class);
}
@@ -254,4 +259,22 @@ public class SnapshotRegistry {
public long latestEpoch() {
return head.prev().epoch();
}
+
+ /**
+ * Associate with this registry.
+ */
+ public void register(Revertable revertable) {
+ revertables.add(revertable);
+ }
+
+ /**
+ * Delete all snapshots and resets all of the Revertable object registered.
+ */
+ public void reset() {
+ deleteSnapshotsUpTo(LATEST_EPOCH);
+
+ for (Revertable revertable : revertables) {
+ revertable.reset();
+ }
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
index 2f5d7be..cbd0a28 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
@@ -280,6 +280,7 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int expectedSize) {
super(expectedSize);
this.snapshotRegistry = snapshotRegistry;
+ snapshotRegistry.register(this);
}
int snapshottableSize(long epoch) {
@@ -452,4 +453,13 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
}
}
}
+
+ @Override
+ public void reset() {
+ Iterator<T> iter = snapshottableIterator(SnapshottableHashTable.LATEST_EPOCH);
+ while (iter.hasNext()) {
+ iter.next();
+ iter.remove();
+ }
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
index 6e02517..855e7ed 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
@@ -179,11 +179,7 @@ public class TimelineHashMap<K, V>
@Override
public void clear() {
- Iterator<TimelineHashMapEntry<K, V>> iter = snapshottableIterator(SnapshottableHashTable.LATEST_EPOCH);
- while (iter.hasNext()) {
- iter.next();
- iter.remove();
- }
+ reset();
}
final class KeySet extends AbstractSet<K> {
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java
index 73ac0e4..34efb10 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java
@@ -225,12 +225,7 @@ public class TimelineHashSet<T>
@Override
public void clear() {
- Iterator<TimelineHashSetEntry<T>> iter =
- snapshottableIterator(SnapshottableHashTable.LATEST_EPOCH);
- while (iter.hasNext()) {
- iter.next();
- iter.remove();
- }
+ reset();
}
@Override
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineInteger.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
index d28db49..d158890 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
@@ -26,8 +26,10 @@ import java.util.Iterator;
* This class requires external synchronization.
*/
public class TimelineInteger implements Revertable {
+ public static final int INIT = 0;
+
static class IntegerContainer implements Delta {
- private int value = 0;
+ private int value = INIT;
int value() {
return value;
@@ -48,7 +50,9 @@ public class TimelineInteger implements Revertable {
public TimelineInteger(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
- this.value = 0;
+ this.value = INIT;
+
+ snapshotRegistry.register(this);
}
public int get() {
@@ -96,6 +100,11 @@ public class TimelineInteger implements Revertable {
}
@Override
+ public void reset() {
+ set(INIT);
+ }
+
+ @Override
public int hashCode() {
return value;
}
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java
index 36a300f..9b401db 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java
@@ -26,8 +26,10 @@ import java.util.Iterator;
* This class requires external synchronization.
*/
public class TimelineLong implements Revertable {
+ public static final long INIT = 0;
+
static class LongContainer implements Delta {
- private long value = 0;
+ private long value = INIT;
long value() {
return value;
@@ -47,12 +49,10 @@ public class TimelineLong implements Revertable {
private long value;
public TimelineLong(SnapshotRegistry snapshotRegistry) {
- this(snapshotRegistry, 0L);
- }
-
- public TimelineLong(SnapshotRegistry snapshotRegistry, long value) {
this.snapshotRegistry = snapshotRegistry;
- this.value = value;
+ this.value = INIT;
+
+ snapshotRegistry.register(this);
}
public long get() {
@@ -100,6 +100,11 @@ public class TimelineLong implements Revertable {
}
@Override
+ public void reset() {
+ set(INIT);
+ }
+
+ @Override
public int hashCode() {
return ((int) value) ^ (int) (value >>> 32);
}
diff --git a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
index ef9405cc..972ff58 100644
--- a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
+++ b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
@@ -18,6 +18,7 @@
package org.apache.kafka.timeline;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
@@ -207,6 +208,26 @@ public class SnapshottableHashTableTest {
assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE), E_1A, E_2A, E_3A);
}
+ @Test
+ public void testReset() {
+ SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
+ SnapshottableHashTable<TestElement> table =
+ new SnapshottableHashTable<>(registry, 1);
+ assertEquals(null, table.snapshottableAddOrReplace(E_1A));
+ assertEquals(null, table.snapshottableAddOrReplace(E_2A));
+ assertEquals(null, table.snapshottableAddOrReplace(E_3A));
+ registry.createSnapshot(0);
+ assertEquals(E_1A, table.snapshottableAddOrReplace(E_1B));
+ assertEquals(E_3A, table.snapshottableAddOrReplace(E_3B));
+ registry.createSnapshot(1);
+
+ registry.reset();
+
+ assertEquals(Collections.emptyList(), registry.epochsList());
+ // Check that the table is empty
+ assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE));
+ }
+
/**
* Assert that the given iterator contains the given elements, in any order.
* We compare using reference equality here, rather than object equality.
diff --git a/metadata/src/test/java/org/apache/kafka/timeline/TimelineIntegerTest.java b/metadata/src/test/java/org/apache/kafka/timeline/TimelineIntegerTest.java
index c2a84c6..13a5d35 100644
--- a/metadata/src/test/java/org/apache/kafka/timeline/TimelineIntegerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/timeline/TimelineIntegerTest.java
@@ -17,6 +17,8 @@
package org.apache.kafka.timeline;
+import java.util.Collections;
+
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -70,4 +72,19 @@ public class TimelineIntegerTest {
registry.revertToSnapshot(2);
assertEquals(0, integer.get());
}
+
+ @Test
+ public void testReset() {
+ SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
+ TimelineInteger value = new TimelineInteger(registry);
+ registry.createSnapshot(2);
+ value.set(1);
+ registry.createSnapshot(3);
+ value.set(2);
+
+ registry.reset();
+
+ assertEquals(Collections.emptyList(), registry.epochsList());
+ assertEquals(TimelineInteger.INIT, value.get());
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/timeline/TimelineLongTest.java b/metadata/src/test/java/org/apache/kafka/timeline/TimelineLongTest.java
index 378c6c6..10ce566 100644
--- a/metadata/src/test/java/org/apache/kafka/timeline/TimelineLongTest.java
+++ b/metadata/src/test/java/org/apache/kafka/timeline/TimelineLongTest.java
@@ -17,6 +17,8 @@
package org.apache.kafka.timeline;
+import java.util.Collections;
+
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -70,4 +72,19 @@ public class TimelineLongTest {
registry.revertToSnapshot(2);
assertEquals(0L, value.get());
}
+
+ @Test
+ public void testReset() {
+ SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
+ TimelineLong value = new TimelineLong(registry);
+ registry.createSnapshot(2);
+ value.set(1L);
+ registry.createSnapshot(3);
+ value.set(2L);
+
+ registry.reset();
+
+ assertEquals(Collections.emptyList(), registry.epochsList());
+ assertEquals(TimelineLong.INIT, value.get());
+ }
}