You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/01/15 09:49:32 UTC

[sling-org-apache-sling-distribution-journal] branch master updated (be33d04 -> b7c5e24)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git.


    from be33d04  GRANITE-28520 - Make sure we announce the correct service interfaces
     new 58ec625  SLING-8996 - Fix error in clear callback. Improve test and logging
     new b7c5e24  SLING-8997 - Make sure shutdown of ExponentialBackOff happens quickly

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../journal/impl/queue/impl/PubQueue.java          |  10 +-
 .../impl/queue/impl/PubQueueProviderImpl.java      |   3 +-
 .../journal/impl/shared/ExponentialBackOff.java    |   9 +
 .../journal/impl/subscriber/CommandPoller.java     |   5 +-
 .../journal/impl/queue/impl/PubQueueTest.java      | 198 ++++++++++++---------
 .../impl/shared/ExponentialBackoffTest.java        |  14 +-
 6 files changed, 149 insertions(+), 90 deletions(-)


[sling-org-apache-sling-distribution-journal] 01/02: SLING-8996 - Fix error in clear callback. Improve test and logging

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 58ec625d1e91f208e2dd05178384b1ff05d3d56d
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Jan 15 10:43:42 2020 +0100

    SLING-8996 - Fix error in clear callback. Improve test and logging
---
 .../journal/impl/queue/impl/PubQueue.java          |  10 +-
 .../impl/queue/impl/PubQueueProviderImpl.java      |   3 +-
 .../journal/impl/subscriber/CommandPoller.java     |   5 +-
 .../journal/impl/queue/impl/PubQueueTest.java      | 198 ++++++++++++---------
 4 files changed, 127 insertions(+), 89 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
index 93f0dca..9bfd4fd 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
@@ -47,9 +47,13 @@ import org.apache.sling.distribution.queue.DistributionQueueState;
 import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.apache.sling.distribution.queue.DistributionQueueType;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @ParametersAreNonnullByDefault
 public class PubQueue implements DistributionQueue {
+    
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
 
     private final String queueName;
 
@@ -134,6 +138,7 @@ public class PubQueue implements DistributionQueue {
          * Until then, the REMOVABLE capability is provided
          * but only allows to remove the head of the queue.
          */
+        log.info("Removing queue entry {}", entryId);
         DistributionQueueEntry headEntry = getHead();
         if (headEntry != null) {
             if (headEntry.getId().equals(entryId)) {
@@ -159,6 +164,7 @@ public class PubQueue implements DistributionQueue {
          * which clears from the head entry to the entry
          * provided with the max offset (tailEntry).
          */
+        log.info("Removing queue entries {}", entryIds);
         Optional<String> tailEntryId = entryIds.stream()
                 .max((e1, e2) -> compare(EntryUtil.entryOffset(e1), EntryUtil.entryOffset(e2)));
         return (tailEntryId.isPresent())
@@ -214,6 +220,7 @@ public class PubQueue implements DistributionQueue {
     }
 
     private Iterable<DistributionQueueEntry> clear(String tailEntryId) {
+        log.info("Clearing up to tail queue entry {}", tailEntryId);
         List<DistributionQueueEntry> removed = new ArrayList<>();
         for (DistributionQueueEntry entry : getEntries(0, -1)) {
             removed.add(entry);
@@ -229,7 +236,8 @@ public class PubQueue implements DistributionQueue {
         if (clearCallback == null) {
             throw new UnsupportedOperationException();
         }
-        clearCallback.clear(EntryUtil.entryOffset(tailEntry.getId()));
+        long offset = EntryUtil.entryOffset(tailEntry.getId());
+        clearCallback.clear(offset);
     }
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index 8b253b7..3b7feab 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -108,7 +108,7 @@ public class PubQueueProviderImpl implements PubQueueProvider {
     @Override
     public DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable) {
         OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
-        ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, minOffset);
+        ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, offset);
         ClearCallback callback = editable ? editableCallback : null;
         return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
     }
@@ -152,6 +152,7 @@ public class PubQueueProviderImpl implements PubQueueProvider {
                 .setSubAgentName(subAgentName)
                 .setClearCommand(clearCommand)
                 .build();
+        LOG.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subSlingId, subAgentName, offset);
         sender.send(topics.getCommandTopic(), commandMessage);
     }
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index e56828f..f9d047b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -81,8 +81,9 @@ public class CommandPoller implements Closeable {
     }
 
     private void handleClearCommand(long offset) {
-        updateClearOffsetIfLarger(offset);
-        LOG.info("Handled clear command for offset {}", offset);
+        long oldOffset = clearOffset.get();
+        long newOffset = updateClearOffsetIfLarger(offset);
+        LOG.info("Handled clear command for offset {}. Old clear offset was {}, new clear offset is {}.", offset, oldOffset, newOffset);
     }
 
     private long updateClearOffsetIfLarger(long offset) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
index 25c15bf..9872ad9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
@@ -18,134 +18,130 @@
  */
 package org.apache.sling.distribution.journal.impl.queue.impl;
 
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_PARTITION;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TOPIC;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueType;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.junit.Before;
 import org.junit.Test;
-
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.*;
-import static java.util.Collections.*;
-import static org.junit.Assert.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("serial")
 public class PubQueueTest {
-
     private static final String TOPIC = "topic";
-
     private static final String PARTITION = "0";
-
     private static final String QUEUE_NAME = "queueName";
-
-    private static final String PACKAGE_ID_1 = "package-1";
-
-    private static final String PACKAGE_ID_2 = "package-2";
-
-    private static final String PACKAGE_ID_3 = "package-3";
-
-    private static final ClearCallback NO_OP = (offset) -> {};
-
-    private static final OffsetQueue<DistributionQueueItem> EMPTY_QUEUE = new OffsetQueueImpl<>();
-
-    private static final OffsetQueue<DistributionQueueItem> THREE_ENTRY_QUEUE = new OffsetQueueImpl<>();
-
-    static {
-
-        THREE_ENTRY_QUEUE.putItem(100, new DistributionQueueItem(PACKAGE_ID_1, new HashMap<String, Object>(){{
-            put(RECORD_TOPIC, TOPIC);
-            put(RECORD_PARTITION, PARTITION);
-            put(RECORD_OFFSET, 100);
-            put(RECORD_TIMESTAMP, 1541538150582L);
-        }}));
-
-        THREE_ENTRY_QUEUE.putItem(200, new DistributionQueueItem(PACKAGE_ID_2, new HashMap<String, Object>(){{
-            put(RECORD_TOPIC, TOPIC);
-            put(RECORD_PARTITION, PARTITION);
-            put(RECORD_OFFSET, 200);
-            put(RECORD_TIMESTAMP, 1541538150584L);
-        }}));
-
-        THREE_ENTRY_QUEUE.putItem(300, new DistributionQueueItem(PACKAGE_ID_3, new HashMap<String, Object>(){{
-            put(RECORD_TOPIC, TOPIC);
-            put(RECORD_PARTITION, PARTITION);
-            put(RECORD_OFFSET, 300);
-            put(RECORD_TIMESTAMP, 1541538150586L);
-        }}));
+    private static final String PACKAGE_ID_PREFIX = "package-";
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+    private Semaphore invoked = new Semaphore(0);
+    private long lastClearOffset = 0l;
+    private OffsetQueue<DistributionQueueItem> offsetQueue;
+    private PubQueue queue;
+
+    @Before
+    public void before () {
+        offsetQueue = new OffsetQueueImpl<>();
+        queue = pubQueue(offsetQueue);
+        addEntries();
     }
 
     @Test
     public void testGetName() throws Exception {
-        assertEquals(QUEUE_NAME, new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, NO_OP).getName());
+        assertEquals(QUEUE_NAME, queue.getName());
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void testAdd() throws Exception {
-        DistributionQueueItem queueItem = new DistributionQueueItem(PACKAGE_ID_1, emptyMap());
-        new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, NO_OP).add(queueItem);
+        queue.add(queueItem(1));
+    }
+    
+    @Test
+    public void testGetHeadEmpty() throws Exception {
+        assertNull(queue.getHead());
     }
 
     @Test
     public void testGetHead() throws Exception {
-        assertNull(pubQueue(EMPTY_QUEUE).getHead());
-        DistributionQueueEntry headEntry = pubQueue(THREE_ENTRY_QUEUE).getHead();
+        addEntries();
+        
+        DistributionQueueEntry headEntry = queue.getHead();
+
         assertNotNull(headEntry);
-        assertEquals(PACKAGE_ID_1, headEntry.getItem().getPackageId());
+        assertEquals(packageId(1), headEntry.getItem().getPackageId());
     }
 
     @Test
     public void testGetItems() throws Exception {
-        Iterator<DistributionQueueEntry> entries = pubQueue(THREE_ENTRY_QUEUE).getEntries(1, 2).iterator();
+        addEntries();
+        
+        Iterator<DistributionQueueEntry> entries = queue.getEntries(1, 2).iterator();
+        
         assertNotNull(entries);
         DistributionQueueEntry entry1 = entries.next();
         assertNotNull(entry1);
-        assertEquals(PACKAGE_ID_2, entry1.getItem().getPackageId());
+        assertEquals(packageId(2), entry1.getItem().getPackageId());
         DistributionQueueEntry entry2 = entries.next();
-        assertEquals(PACKAGE_ID_3, entry2.getItem().getPackageId());
+        assertEquals(packageId(3), entry2.getItem().getPackageId());
     }
 
     @Test
     public void testGetItem() throws Exception {
+        addEntries();
+        
         String entryId = TOPIC + "-" + PARTITION + "@" + 200;
-        DistributionQueueEntry queueEntry = pubQueue(THREE_ENTRY_QUEUE).getEntry(entryId);
+        DistributionQueueEntry queueEntry = queue.getEntry(entryId);
+        
         assertNotNull(queueEntry);
-        assertEquals(PACKAGE_ID_2, queueEntry.getItem().getPackageId());
+        assertEquals(packageId(2), queueEntry.getItem().getPackageId());
     }
 
     @Test
     public void testRemoveHead() throws Exception {
-        final Semaphore invoked = new Semaphore(1);
-        DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release());
-        String headEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getHeadItem());
+        addEntries();
+        
+        String headEntryId = EntryUtil.entryId(offsetQueue.getHeadItem());
         DistributionQueueEntry removed = queue.remove(headEntryId);
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
+        
+        assertClearCallbackInvoked();
         assertEquals(headEntryId, removed.getId());
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void testRemoveRandomItemFails() throws Exception {
-        DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE);
-        String randomEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getItem(200));
+        addEntries();
+        
+        String randomEntryId = EntryUtil.entryId(offsetQueue.getItem(200));
         queue.remove(randomEntryId);
     }
 
     @Test
     public void testRemoveSetOfRandomItemsWillClear() throws Exception {
-        final Semaphore invoked = new Semaphore(2);
-        DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release());
-        String headEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getHeadItem());
-        String randomEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getItem(200));
+        addEntries();
+        String headEntryId = EntryUtil.entryId(offsetQueue.getHeadItem());
+        String randomEntryId = EntryUtil.entryId(offsetQueue.getItem(offset(2)));
 
         Iterator<DistributionQueueEntry> removed = queue.remove(Collections.singleton(randomEntryId)).iterator();
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
+        
+        assertClearCallbackInvoked();
         assertEquals(headEntryId, removed.next().getId());
         assertEquals(randomEntryId, removed.next().getId());
         assertFalse(removed.hasNext());
@@ -153,47 +149,79 @@ public class PubQueueTest {
 
     @Test
     public void testRemoveSetOfNonExistingItem() throws Exception {
-        DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE);
-
+        addEntries();
+        
         Iterable<DistributionQueueEntry> removed = queue.remove(Collections.singleton("nonexisting-0@99999"));
+        
         assertFalse(removed.iterator().hasNext());
     }
 
     @Test
     public void testClearAll() throws Exception {
-        final Semaphore invoked = new Semaphore(3);
-        DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release());
+        addEntries();
 
         Iterable<DistributionQueueEntry> removed = queue.clear(-1);
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
-        assertEquals(3, StreamSupport.stream(removed.spliterator(), false).count());
+        
+        assertClearCallbackInvoked();
+        assertEquals(3, streamOf(removed).count());
+        assertEquals(offset(3), lastClearOffset);
     }
 
     @Test
     public void testClearPartial() throws Exception {
-        final Semaphore invoked = new Semaphore(2);
-        DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release());
-
+        addEntries();
+        
         Iterable<DistributionQueueEntry> removed = queue.clear(2);
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
-        assertEquals(2, StreamSupport.stream(removed.spliterator(), false).count());
+        
+        assertClearCallbackInvoked();
+        assertEquals(2, streamOf(removed).count());
+        assertEquals(offset(2), lastClearOffset);
     }
 
     @Test
     public void testGetType() throws Exception {
-        assertEquals(DistributionQueueType.ORDERED, new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, NO_OP).getType());
+        assertEquals(DistributionQueueType.ORDERED, queue.getType());
     }
 
-    private PubQueue pubQueue(OffsetQueue<DistributionQueueItem> offsetQueue) {
-        return pubQueue(offsetQueue, NO_OP);
+    private void assertClearCallbackInvoked() throws InterruptedException {
+        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
     }
 
-    private PubQueue pubQueue(OffsetQueue<DistributionQueueItem> offsetQueue, ClearCallback clearCallback) {
-        return new PubQueue(QUEUE_NAME, offsetQueue, 0, clearCallback);
+    private void addEntries() {
+        offsetQueue.putItem(offset(1), queueItem(1));
+        offsetQueue.putItem(offset(2), queueItem(2));
+        offsetQueue.putItem(offset(3), queueItem(3));
     }
 
+    private DistributionQueueItem queueItem(int nr) {
+        HashMap<String, Object> data = new HashMap<String, Object>(){{
+            put(RECORD_TOPIC, TOPIC);
+            put(RECORD_PARTITION, PARTITION);
+            put(RECORD_OFFSET, offset(nr));
+            put(RECORD_TIMESTAMP, 1541538150580L + nr * 2);
+        }};
+        return new DistributionQueueItem(packageId(nr), data);
+    }
 
+    private long offset(int nr) {
+        return nr * 100;
+    };
+    
+    private static String packageId(int nr) {
+        return PACKAGE_ID_PREFIX + new Integer(nr).toString();
+    }
+
+    private Stream<DistributionQueueEntry> streamOf(Iterable<DistributionQueueEntry> entries) {
+        return StreamSupport.stream(entries.spliterator(), false);
+    }
+
+    private PubQueue pubQueue(OffsetQueue<DistributionQueueItem> offsetQueue) {
+        return new PubQueue(QUEUE_NAME, offsetQueue, 0, this::clearCallback);
+    }
+
+    private void clearCallback(long offset) {
+        log.info("Clearcallback with offset {}", offset);
+        lastClearOffset = offset; 
+        invoked.release();
+    }
 }
\ No newline at end of file


[sling-org-apache-sling-distribution-journal] 02/02: SLING-8997 - Make sure shutdown of ExponentialBackOff happens quickly

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit b7c5e249a79a6f78954a24c9820e4c9efa663171
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Jan 15 10:49:15 2020 +0100

    SLING-8997 - Make sure shutdown of ExponentialBackOff happens quickly
---
 .../journal/impl/shared/ExponentialBackOff.java            |  9 +++++++++
 .../journal/impl/shared/ExponentialBackoffTest.java        | 14 +++++++++++++-
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
index ea7cc28..1338ef4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
@@ -25,6 +25,7 @@ import java.time.Duration;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
@@ -63,7 +64,15 @@ public class ExponentialBackOff implements Closeable {
 
     @Override
     public void close() {
+        log.info("Shutting down exponential backoff executor");
         this.executor.shutdown();
+        this.executor.shutdownNow();
+        try {
+            this.executor.awaitTermination(100, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        log.info("Shutdown completed");
     }
     
     public void startChecks() {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
index a69aed6..55e4c01 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -37,6 +38,7 @@ public class ExponentialBackoffTest {
     private static final int RETRIES = 5;
     private static final Duration INITIAL_DELAY = Duration.of(64, MILLIS);
     private static final Duration MAX_DELAY = Duration.of(256, MILLIS);
+    private static final Duration LONG_DELAY = Duration.of(5, ChronoUnit.SECONDS);
     
     private Logger log = LoggerFactory.getLogger(this.getClass());
     
@@ -68,7 +70,17 @@ public class ExponentialBackoffTest {
         assertThat("Should finish quickly as we called startChecks after enough delay", finished3, equalTo(true));
 
         backOff.close();
-        
+    }
+    
+    /**
+     * Even with a scheduled check the shutdown is expected to be 
+     * fast and to clean up its thread
+     */
+    @Test(timeout = 500)
+    public void testShutdown() throws Exception {
+        ExponentialBackOff backoff = new ExponentialBackOff(LONG_DELAY, LONG_DELAY, false, this::checkCallback);
+        backoff.startChecks();
+        backoff.close(); // We expect this to finish in less than the test timeout
     }
     
     private void checkCallback() {