You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2023/01/01 17:34:29 UTC

[incubator-tuweni] branch main updated: Add a global expiration listener

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/main by this push:
     new 437434be Add a global expiration listener
     new eb0598c3 Merge pull request #459 from atoulme/add_expiration_listener
437434be is described below

commit 437434bef32b42e51ffa1c8a11a6f4fd33063ca2
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Sun Jan 1 01:07:46 2023 -0800

    Add a global expiration listener
---
 .../org/apache/tuweni/concurrent/ExpiringMap.java  | 53 ++++++++++++++++------
 .../org/apache/tuweni/concurrent/ExpiringSet.java  | 29 +++++++++---
 .../apache/tuweni/concurrent/ExpiringMapTest.java  | 18 +++++++-
 .../apache/tuweni/concurrent/ExpiringSetTest.java  | 12 ++++-
 4 files changed, 88 insertions(+), 24 deletions(-)

diff --git a/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringMap.java b/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringMap.java
index 8fc5c0d6..592efd47 100644
--- a/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringMap.java
+++ b/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringMap.java
@@ -62,11 +62,14 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
   private final LongSupplier currentTimeSupplier;
   private final Long defaultTimeout;
 
+  @Nullable
+  private final BiConsumer<K, V> globalExpiryListener;
+
   /**
    * Construct an empty map.
    */
   public ExpiringMap() {
-    this(System::currentTimeMillis, Long.MAX_VALUE);
+    this(System::currentTimeMillis, Long.MAX_VALUE, null);
   }
 
   /**
@@ -75,12 +78,23 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
    * @param defaultTimeout the default timeout in milliseconds
    */
   public ExpiringMap(Long defaultTimeout) {
-    this(System::currentTimeMillis, defaultTimeout);
+    this(System::currentTimeMillis, defaultTimeout, null);
   }
 
-  ExpiringMap(LongSupplier currentTimeSupplier, Long defaultTimeout) {
+  /**
+   * Construct a map with a default timeout value and a global expiration listener.
+   *
+   * @param defaultTimeout the default timeout in milliseconds
+   * @param expiryListener a listener that will be called for each entry expiration
+   */
+  public ExpiringMap(Long defaultTimeout, BiConsumer<K, V> expiryListener) {
+    this(System::currentTimeMillis, defaultTimeout, expiryListener);
+  }
+
+  ExpiringMap(LongSupplier currentTimeSupplier, Long defaultTimeout, BiConsumer<K, V> expiryListener) {
     this.currentTimeSupplier = currentTimeSupplier;
     this.defaultTimeout = defaultTimeout;
+    this.globalExpiryListener = expiryListener;
   }
 
   @Nullable
@@ -130,7 +144,8 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
     requireNonNull(key);
     requireNonNull(value);
     purgeExpired();
-    ExpiringEntry<K, V> oldEntry = storage.put(key, new ExpiringEntry<>(key, value, defaultTimeout, null));
+    ExpiringEntry<K, V> oldEntry =
+        storage.put(key, new ExpiringEntry<>(key, value, defaultTimeout, globalExpiryListener));
     return (oldEntry == null) ? null : oldEntry.value;
   }
 
@@ -146,7 +161,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
    */
   @Nullable
   public V put(K key, V value, long expiry) {
-    return put(key, value, expiry, null);
+    return put(key, value, expiry, globalExpiryListener);
   }
 
   /**
@@ -179,7 +194,8 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
       return previous;
     }
 
-    ExpiringEntry<K, V> newEntry = new ExpiringEntry<>(key, value, expiry, expiryListener);
+    ExpiringEntry<K, V> newEntry =
+        new ExpiringEntry<>(key, value, expiry, expiryListener == null ? globalExpiryListener : expiryListener);
     ExpiringEntry<K, V> oldEntry = storage.put(key, newEntry);
     expiryQueue.offer(newEntry);
     if (oldEntry != null && oldEntry.expiry < Long.MAX_VALUE) {
@@ -193,7 +209,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
     requireNonNull(m);
     purgeExpired();
     for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
-      storage.put(e.getKey(), new ExpiringEntry<>(e.getKey(), e.getValue(), defaultTimeout, null));
+      storage.put(e.getKey(), new ExpiringEntry<>(e.getKey(), e.getValue(), defaultTimeout, globalExpiryListener));
     }
   }
 
@@ -203,7 +219,8 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
     requireNonNull(key);
     requireNonNull(value);
     purgeExpired();
-    ExpiringEntry<K, V> oldEntry = storage.putIfAbsent(key, new ExpiringEntry<>(key, value, defaultTimeout, null));
+    ExpiringEntry<K, V> oldEntry =
+        storage.putIfAbsent(key, new ExpiringEntry<>(key, value, defaultTimeout, globalExpiryListener));
     return (oldEntry == null) ? null : oldEntry.value;
   }
 
@@ -218,7 +235,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
    */
   @Nullable
   public V putIfAbsent(K key, V value, long expiry) {
-    return putIfAbsent(key, value, expiry, null);
+    return putIfAbsent(key, value, expiry, globalExpiryListener);
   }
 
   /**
@@ -250,7 +267,8 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
       return previous;
     }
 
-    ExpiringEntry<K, V> newEntry = new ExpiringEntry<>(key, value, expiry, expiryListener);
+    ExpiringEntry<K, V> newEntry =
+        new ExpiringEntry<>(key, value, expiry, expiryListener == null ? globalExpiryListener : expiryListener);
     ExpiringEntry<K, V> oldEntry = storage.putIfAbsent(key, newEntry);
     if (oldEntry == null) {
       expiryQueue.offer(newEntry);
@@ -292,7 +310,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
         expiryQueue.remove(oldEntry);
       }
       V newValue = remappingFunction.apply(k, oldEntry.value);
-      return (newValue == null) ? null : new ExpiringEntry<>(k, newValue, defaultTimeout, null);
+      return (newValue == null) ? null : new ExpiringEntry<>(k, newValue, defaultTimeout, globalExpiryListener);
     });
     return (newEntry == null) ? null : newEntry.value;
   }
@@ -305,14 +323,15 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
             expiryQueue.remove(oldEntry);
           }
           V newValue = remappingFunction.apply(oldEntry.value, newEntry.value);
-          return (newValue == null) ? null : new ExpiringEntry<>(key, newValue, defaultTimeout, null);
+          return (newValue == null) ? null : new ExpiringEntry<>(key, newValue, defaultTimeout, globalExpiryListener);
         });
     return (entry == null) ? null : entry.value;
   }
 
   @Override
   public V replace(K key, V value) {
-    ExpiringEntry<K, V> oldEntry = storage.replace(key, new ExpiringEntry<>(key, value, defaultTimeout, null));
+    ExpiringEntry<K, V> oldEntry =
+        storage.replace(key, new ExpiringEntry<>(key, value, defaultTimeout, globalExpiryListener));
     if (oldEntry != null) {
       if (oldEntry.expiry < Long.MAX_VALUE) {
         expiryQueue.remove(oldEntry);
@@ -331,7 +350,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
         if (oldEntry.expiry < Long.MAX_VALUE) {
           expiryQueue.remove(oldEntry);
         }
-        return new ExpiringEntry<>(k, newValue, defaultTimeout, null);
+        return new ExpiringEntry<>(k, newValue, defaultTimeout, globalExpiryListener);
       }
       return oldEntry;
     });
@@ -344,7 +363,11 @@ public final class ExpiringMap<K, V> implements Map<K, V> {
       if (oldEntry.expiry < Long.MAX_VALUE) {
         expiryQueue.remove(oldEntry);
       }
-      return new ExpiringEntry<>(k, requireNonNull(function.apply(k, oldEntry.value)), defaultTimeout, null);
+      return new ExpiringEntry<>(
+          k,
+          requireNonNull(function.apply(k, oldEntry.value)),
+          defaultTimeout,
+          globalExpiryListener);
     });
   }
 
diff --git a/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringSet.java b/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringSet.java
index 6974d240..97703e49 100644
--- a/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringSet.java
+++ b/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringSet.java
@@ -34,6 +34,8 @@ import javax.annotation.Nullable;
  */
 public final class ExpiringSet<E> implements Set<E> {
 
+  private final Consumer<E> globalExpiryListener;
+
   // Uses object equality, to ensure uniqueness as a value in the storage map
   private static final class ExpiringEntry<E> implements Comparable<ExpiringEntry<E>> {
     private E element;
@@ -64,22 +66,33 @@ public final class ExpiringSet<E> implements Set<E> {
    * @param evictionTimeout the default eviction timeout for entries in milliseconds.
    */
   public ExpiringSet(long evictionTimeout) {
-    this(evictionTimeout, System::currentTimeMillis);
+    this(evictionTimeout, System::currentTimeMillis, null);
+  }
+
+  /**
+   * Construct an empty expiring set.
+   *
+   * @param evictionTimeout the default eviction timeout for entries in milliseconds.
+   * @param expiryListener a listener that will be called for each entry expiration
+   */
+  public ExpiringSet(long evictionTimeout, Consumer<E> expiryListener) {
+    this(evictionTimeout, System::currentTimeMillis, expiryListener);
   }
 
   /**
    * Construct an empty set.
    */
   public ExpiringSet() {
-    this(Long.MAX_VALUE, System::currentTimeMillis);
+    this(Long.MAX_VALUE, System::currentTimeMillis, null);
   }
 
-  ExpiringSet(long evictionTimeout, LongSupplier currentTimeSupplier) {
+  ExpiringSet(long evictionTimeout, LongSupplier currentTimeSupplier, Consumer<E> expiryListener) {
     if (evictionTimeout <= 0) {
       throw new IllegalArgumentException("Invalid eviction timeout " + evictionTimeout);
     }
     this.evictionTimeout = evictionTimeout;
     this.currentTimeSupplier = currentTimeSupplier;
+    this.globalExpiryListener = expiryListener;
   }
 
   @Override
@@ -135,7 +148,7 @@ public final class ExpiringSet<E> implements Set<E> {
     requireNonNull(e);
     purgeExpired();
     ExpiringEntry<E> oldEntry =
-        storage.put(e, new ExpiringEntry<>(e, currentTimeSupplier.getAsLong() + evictionTimeout, null));
+        storage.put(e, new ExpiringEntry<>(e, currentTimeSupplier.getAsLong() + evictionTimeout, globalExpiryListener));
     return oldEntry == null;
   }
 
@@ -148,7 +161,7 @@ public final class ExpiringSet<E> implements Set<E> {
    * @return {@code true} if this set did not already contain the specified element.
    */
   public boolean add(E element, long expiry) {
-    return add(element, expiry, null);
+    return add(element, expiry, globalExpiryListener);
   }
 
   /**
@@ -174,7 +187,8 @@ public final class ExpiringSet<E> implements Set<E> {
       return removedPrevious;
     }
 
-    ExpiringEntry<E> newEntry = new ExpiringEntry<>(element, expiry, expiryListener);
+    ExpiringEntry<E> newEntry =
+        new ExpiringEntry<>(element, expiry, expiryListener == null ? globalExpiryListener : expiryListener);
     ExpiringEntry<E> oldEntry = storage.put(element, newEntry);
     expiryQueue.offer(newEntry);
     if (oldEntry != null && oldEntry.expiry < Long.MAX_VALUE) {
@@ -189,7 +203,8 @@ public final class ExpiringSet<E> implements Set<E> {
     purgeExpired();
     boolean noOldElements = true;
     for (E element : c) {
-      ExpiringEntry<E> oldEntry = storage.put(element, new ExpiringEntry<>(element, Long.MAX_VALUE, null));
+      ExpiringEntry<E> oldEntry =
+          storage.put(element, new ExpiringEntry<>(element, Long.MAX_VALUE, globalExpiryListener));
       if (oldEntry != null) {
         noOldElements = false;
       }
diff --git a/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringMapTest.java b/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringMapTest.java
index df981974..afca86d8 100644
--- a/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringMapTest.java
+++ b/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringMapTest.java
@@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -32,7 +33,7 @@ class ExpiringMapTest {
   @BeforeEach
   void setup() {
     currentTime = Instant.now();
-    map = new ExpiringMap<>(() -> currentTime.toEpochMilli(), Long.MAX_VALUE);
+    map = new ExpiringMap<>(() -> currentTime.toEpochMilli(), Long.MAX_VALUE, null);
   }
 
   @Test
@@ -75,6 +76,21 @@ class ExpiringMapTest {
     assertNull(map.remove(1));
   }
 
+  @Test
+  void addGlobalExpiryListener() {
+    AtomicReference<String> key = new AtomicReference<>();
+    AtomicReference<String> value = new AtomicReference<>();
+    ExpiringMap<String, String> listeningMap =
+        new ExpiringMap<>(() -> currentTime.toEpochMilli(), 1L, (String k, String v) -> {
+          key.set(k);
+          value.set(v);
+        });
+    listeningMap.put("foo", "bar", -1);
+    assertEquals("foo", key.get());
+    assertEquals("bar", value.get());
+    assertEquals(0, listeningMap.size());
+  }
+
   @Test
   void itemIsExpiredAfterExpiry() {
     Instant futureTime = currentTime.plusSeconds(10);
diff --git a/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringSetTest.java b/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringSetTest.java
index 541367bd..c6a47066 100644
--- a/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringSetTest.java
+++ b/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringSetTest.java
@@ -17,6 +17,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.time.Instant;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -29,7 +30,7 @@ class ExpiringSetTest {
   @BeforeEach
   void setup() {
     currentTime = Instant.now();
-    set = new ExpiringSet<>(Long.MAX_VALUE, () -> currentTime.toEpochMilli());
+    set = new ExpiringSet<>(Long.MAX_VALUE, () -> currentTime.toEpochMilli(), null);
   }
 
   @Test
@@ -62,6 +63,15 @@ class ExpiringSetTest {
     assertFalse(set.remove("foo"));
   }
 
+  @Test
+  void addGlobalExpiryListener() {
+    AtomicReference<String> key = new AtomicReference<>();
+    ExpiringSet<String> listeningSet = new ExpiringSet<>(1L, () -> currentTime.toEpochMilli(), key::set);
+    listeningSet.add("foo", -1);
+    assertEquals("foo", key.get());
+    assertEquals(0, listeningSet.size());
+  }
+
   @Test
   void itemIsMissingAfterExpiry() {
     Instant futureTime = currentTime.plusSeconds(10);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org