You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2023/01/01 17:34:29 UTC
[incubator-tuweni] branch main updated: Add a global expiration listener
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push:
new 437434be Add a global expiration listener
new eb0598c3 Merge pull request #459 from atoulme/add_expiration_listener
437434be is described below
commit 437434bef32b42e51ffa1c8a11a6f4fd33063ca2
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Sun Jan 1 01:07:46 2023 -0800
Add a global expiration listener
---
.../org/apache/tuweni/concurrent/ExpiringMap.java | 53 ++++++++++++++++------
.../org/apache/tuweni/concurrent/ExpiringSet.java | 29 +++++++++---
.../apache/tuweni/concurrent/ExpiringMapTest.java | 18 +++++++-
.../apache/tuweni/concurrent/ExpiringSetTest.java | 12 ++++-
4 files changed, 88 insertions(+), 24 deletions(-)
diff --git a/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringMap.java b/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringMap.java
index 8fc5c0d6..592efd47 100644
--- a/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringMap.java
+++ b/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringMap.java
@@ -62,11 +62,14 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
private final LongSupplier currentTimeSupplier;
private final Long defaultTimeout;
+ @Nullable
+ private final BiConsumer<K, V> globalExpiryListener;
+
/**
* Construct an empty map.
*/
public ExpiringMap() {
- this(System::currentTimeMillis, Long.MAX_VALUE);
+ this(System::currentTimeMillis, Long.MAX_VALUE, null);
}
/**
@@ -75,12 +78,23 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
* @param defaultTimeout the default timeout in milliseconds
*/
public ExpiringMap(Long defaultTimeout) {
- this(System::currentTimeMillis, defaultTimeout);
+ this(System::currentTimeMillis, defaultTimeout, null);
}
- ExpiringMap(LongSupplier currentTimeSupplier, Long defaultTimeout) {
+ /**
+ * Construct a map with a default timeout value and a global expiration listener.
+ *
+ * @param defaultTimeout the default timeout in milliseconds
+ * @param expiryListener a listener that will be called for each entry expiration
+ */
+ public ExpiringMap(Long defaultTimeout, BiConsumer<K, V> expiryListener) {
+ this(System::currentTimeMillis, defaultTimeout, expiryListener);
+ }
+
+ ExpiringMap(LongSupplier currentTimeSupplier, Long defaultTimeout, BiConsumer<K, V> expiryListener) {
this.currentTimeSupplier = currentTimeSupplier;
this.defaultTimeout = defaultTimeout;
+ this.globalExpiryListener = expiryListener;
}
@Nullable
@@ -130,7 +144,8 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
requireNonNull(key);
requireNonNull(value);
purgeExpired();
- ExpiringEntry<K, V> oldEntry = storage.put(key, new ExpiringEntry<>(key, value, defaultTimeout, null));
+ ExpiringEntry<K, V> oldEntry =
+ storage.put(key, new ExpiringEntry<>(key, value, defaultTimeout, globalExpiryListener));
return (oldEntry == null) ? null : oldEntry.value;
}
@@ -146,7 +161,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
*/
@Nullable
public V put(K key, V value, long expiry) {
- return put(key, value, expiry, null);
+ return put(key, value, expiry, globalExpiryListener);
}
/**
@@ -179,7 +194,8 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
return previous;
}
- ExpiringEntry<K, V> newEntry = new ExpiringEntry<>(key, value, expiry, expiryListener);
+ ExpiringEntry<K, V> newEntry =
+ new ExpiringEntry<>(key, value, expiry, expiryListener == null ? globalExpiryListener : expiryListener);
ExpiringEntry<K, V> oldEntry = storage.put(key, newEntry);
expiryQueue.offer(newEntry);
if (oldEntry != null && oldEntry.expiry < Long.MAX_VALUE) {
@@ -193,7 +209,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
requireNonNull(m);
purgeExpired();
for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
- storage.put(e.getKey(), new ExpiringEntry<>(e.getKey(), e.getValue(), defaultTimeout, null));
+ storage.put(e.getKey(), new ExpiringEntry<>(e.getKey(), e.getValue(), defaultTimeout, globalExpiryListener));
}
}
@@ -203,7 +219,8 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
requireNonNull(key);
requireNonNull(value);
purgeExpired();
- ExpiringEntry<K, V> oldEntry = storage.putIfAbsent(key, new ExpiringEntry<>(key, value, defaultTimeout, null));
+ ExpiringEntry<K, V> oldEntry =
+ storage.putIfAbsent(key, new ExpiringEntry<>(key, value, defaultTimeout, globalExpiryListener));
return (oldEntry == null) ? null : oldEntry.value;
}
@@ -218,7 +235,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
*/
@Nullable
public V putIfAbsent(K key, V value, long expiry) {
- return putIfAbsent(key, value, expiry, null);
+ return putIfAbsent(key, value, expiry, globalExpiryListener);
}
/**
@@ -250,7 +267,8 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
return previous;
}
- ExpiringEntry<K, V> newEntry = new ExpiringEntry<>(key, value, expiry, expiryListener);
+ ExpiringEntry<K, V> newEntry =
+ new ExpiringEntry<>(key, value, expiry, expiryListener == null ? globalExpiryListener : expiryListener);
ExpiringEntry<K, V> oldEntry = storage.putIfAbsent(key, newEntry);
if (oldEntry == null) {
expiryQueue.offer(newEntry);
@@ -292,7 +310,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
expiryQueue.remove(oldEntry);
}
V newValue = remappingFunction.apply(k, oldEntry.value);
- return (newValue == null) ? null : new ExpiringEntry<>(k, newValue, defaultTimeout, null);
+ return (newValue == null) ? null : new ExpiringEntry<>(k, newValue, defaultTimeout, globalExpiryListener);
});
return (newEntry == null) ? null : newEntry.value;
}
@@ -305,14 +323,15 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
expiryQueue.remove(oldEntry);
}
V newValue = remappingFunction.apply(oldEntry.value, newEntry.value);
- return (newValue == null) ? null : new ExpiringEntry<>(key, newValue, defaultTimeout, null);
+ return (newValue == null) ? null : new ExpiringEntry<>(key, newValue, defaultTimeout, globalExpiryListener);
});
return (entry == null) ? null : entry.value;
}
@Override
public V replace(K key, V value) {
- ExpiringEntry<K, V> oldEntry = storage.replace(key, new ExpiringEntry<>(key, value, defaultTimeout, null));
+ ExpiringEntry<K, V> oldEntry =
+ storage.replace(key, new ExpiringEntry<>(key, value, defaultTimeout, globalExpiryListener));
if (oldEntry != null) {
if (oldEntry.expiry < Long.MAX_VALUE) {
expiryQueue.remove(oldEntry);
@@ -331,7 +350,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
if (oldEntry.expiry < Long.MAX_VALUE) {
expiryQueue.remove(oldEntry);
}
- return new ExpiringEntry<>(k, newValue, defaultTimeout, null);
+ return new ExpiringEntry<>(k, newValue, defaultTimeout, globalExpiryListener);
}
return oldEntry;
});
@@ -344,7 +363,11 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
if (oldEntry.expiry < Long.MAX_VALUE) {
expiryQueue.remove(oldEntry);
}
- return new ExpiringEntry<>(k, requireNonNull(function.apply(k, oldEntry.value)), defaultTimeout, null);
+ return new ExpiringEntry<>(
+ k,
+ requireNonNull(function.apply(k, oldEntry.value)),
+ defaultTimeout,
+ globalExpiryListener);
});
}
diff --git a/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringSet.java b/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringSet.java
index 6974d240..97703e49 100644
--- a/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringSet.java
+++ b/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringSet.java
@@ -34,6 +34,8 @@ import javax.annotation.Nullable;
*/
public final class ExpiringSet<E> implements Set<E> {
+ private final Consumer<E> globalExpiryListener;
+
// Uses object equality, to ensure uniqueness as a value in the storage map
private static final class ExpiringEntry<E> implements Comparable<ExpiringEntry<E>> {
private E element;
@@ -64,22 +66,33 @@ public final class ExpiringSet<E> implements Set<E> {
* @param evictionTimeout the default eviction timeout for entries in milliseconds.
*/
public ExpiringSet(long evictionTimeout) {
- this(evictionTimeout, System::currentTimeMillis);
+ this(evictionTimeout, System::currentTimeMillis, null);
+ }
+
+ /**
+ * Construct an empty expiring set.
+ *
+ * @param evictionTimeout the default eviction timeout for entries in milliseconds.
+ * @param expiryListener a listener that will be called for each entry expiration
+ */
+ public ExpiringSet(long evictionTimeout, Consumer<E> expiryListener) {
+ this(evictionTimeout, System::currentTimeMillis, expiryListener);
}
/**
* Construct an empty set.
*/
public ExpiringSet() {
- this(Long.MAX_VALUE, System::currentTimeMillis);
+ this(Long.MAX_VALUE, System::currentTimeMillis, null);
}
- ExpiringSet(long evictionTimeout, LongSupplier currentTimeSupplier) {
+ ExpiringSet(long evictionTimeout, LongSupplier currentTimeSupplier, Consumer<E> expiryListener) {
if (evictionTimeout <= 0) {
throw new IllegalArgumentException("Invalid eviction timeout " + evictionTimeout);
}
this.evictionTimeout = evictionTimeout;
this.currentTimeSupplier = currentTimeSupplier;
+ this.globalExpiryListener = expiryListener;
}
@Override
@@ -135,7 +148,7 @@ public final class ExpiringSet<E> implements Set<E> {
requireNonNull(e);
purgeExpired();
ExpiringEntry<E> oldEntry =
- storage.put(e, new ExpiringEntry<>(e, currentTimeSupplier.getAsLong() + evictionTimeout, null));
+ storage.put(e, new ExpiringEntry<>(e, currentTimeSupplier.getAsLong() + evictionTimeout, globalExpiryListener));
return oldEntry == null;
}
@@ -148,7 +161,7 @@ public final class ExpiringSet<E> implements Set<E> {
* @return {@code true} if this set did not already contain the specified element.
*/
public boolean add(E element, long expiry) {
- return add(element, expiry, null);
+ return add(element, expiry, globalExpiryListener);
}
/**
@@ -174,7 +187,8 @@ public final class ExpiringSet<E> implements Set<E> {
return removedPrevious;
}
- ExpiringEntry<E> newEntry = new ExpiringEntry<>(element, expiry, expiryListener);
+ ExpiringEntry<E> newEntry =
+ new ExpiringEntry<>(element, expiry, expiryListener == null ? globalExpiryListener : expiryListener);
ExpiringEntry<E> oldEntry = storage.put(element, newEntry);
expiryQueue.offer(newEntry);
if (oldEntry != null && oldEntry.expiry < Long.MAX_VALUE) {
@@ -189,7 +203,8 @@ public final class ExpiringSet<E> implements Set<E> {
purgeExpired();
boolean noOldElements = true;
for (E element : c) {
- ExpiringEntry<E> oldEntry = storage.put(element, new ExpiringEntry<>(element, Long.MAX_VALUE, null));
+ ExpiringEntry<E> oldEntry =
+ storage.put(element, new ExpiringEntry<>(element, Long.MAX_VALUE, globalExpiryListener));
if (oldEntry != null) {
noOldElements = false;
}
diff --git a/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringMapTest.java b/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringMapTest.java
index df981974..afca86d8 100644
--- a/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringMapTest.java
+++ b/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringMapTest.java
@@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -32,7 +33,7 @@ class ExpiringMapTest {
@BeforeEach
void setup() {
currentTime = Instant.now();
- map = new ExpiringMap<>(() -> currentTime.toEpochMilli(), Long.MAX_VALUE);
+ map = new ExpiringMap<>(() -> currentTime.toEpochMilli(), Long.MAX_VALUE, null);
}
@Test
@@ -75,6 +76,21 @@ class ExpiringMapTest {
assertNull(map.remove(1));
}
+ @Test
+ void addGlobalExpiryListener() {
+ AtomicReference<String> key = new AtomicReference<>();
+ AtomicReference<String> value = new AtomicReference<>();
+ ExpiringMap<String, String> listeningMap =
+ new ExpiringMap<>(() -> currentTime.toEpochMilli(), 1L, (String k, String v) -> {
+ key.set(k);
+ value.set(v);
+ });
+ listeningMap.put("foo", "bar", -1);
+ assertEquals("foo", key.get());
+ assertEquals("bar", value.get());
+ assertEquals(0, listeningMap.size());
+ }
+
@Test
void itemIsExpiredAfterExpiry() {
Instant futureTime = currentTime.plusSeconds(10);
diff --git a/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringSetTest.java b/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringSetTest.java
index 541367bd..c6a47066 100644
--- a/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringSetTest.java
+++ b/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringSetTest.java
@@ -17,6 +17,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -29,7 +30,7 @@ class ExpiringSetTest {
@BeforeEach
void setup() {
currentTime = Instant.now();
- set = new ExpiringSet<>(Long.MAX_VALUE, () -> currentTime.toEpochMilli());
+ set = new ExpiringSet<>(Long.MAX_VALUE, () -> currentTime.toEpochMilli(), null);
}
@Test
@@ -62,6 +63,15 @@ class ExpiringSetTest {
assertFalse(set.remove("foo"));
}
+ @Test
+ void addGlobalExpiryListener() {
+ AtomicReference<String> key = new AtomicReference<>();
+ ExpiringSet<String> listeningSet = new ExpiringSet<>(1L, () -> currentTime.toEpochMilli(), key::set);
+ listeningSet.add("foo", -1);
+ assertEquals("foo", key.get());
+ assertEquals(0, listeningSet.size());
+ }
+
@Test
void itemIsMissingAfterExpiry() {
Instant futureTime = currentTime.plusSeconds(10);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org