You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mi...@apache.org on 2021/12/01 09:35:14 UTC

[jackrabbit-oak] branch trunk updated: OAK-8440 Fix flaky SegmentWriteQueueTest#testFlush

This is an automated email from the ASF dual-hosted git repository.

miroslav pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 96f57f6  OAK-8440 Fix flaky SegmentWriteQueueTest#testFlush
     new 9125599  Merge pull request #430 from jelmini/issues/fix-segment-write-queue-test
96f57f6 is described below

commit 96f57f68444224eb4b4052bc30334857fb95f79b
Author: Carlo Jelmini <je...@adobe.com>
AuthorDate: Tue Nov 30 16:21:22 2021 +0100

    OAK-8440 Fix flaky SegmentWriteQueueTest#testFlush
    
    Remove ConcurrentModificationException by using CopyOnWriteArraySet and
    ensuring progress conditions are actually met.
    Misc code improvements to reduce code duplication.
---
 .../remote/queue/SegmentWriteQueueTest.java        | 90 +++++++++++++---------
 1 file changed, 54 insertions(+), 36 deletions(-)

diff --git a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
index c7e3746..4efe2f2 100644
--- a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
+++ b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.jackrabbit.oak.segment.remote.queue;
 
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
 import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
 import org.junit.After;
 import org.junit.Test;
@@ -35,6 +38,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static java.util.stream.Collectors.toSet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -92,7 +96,7 @@ public class SegmentWriteQueueTest {
         semaphore.release(Integer.MAX_VALUE);
 
         AtomicBoolean flushFinished = new AtomicBoolean(false);
-        Thread flusher = new Thread(() -> {
+        Thread flusher = runInThread(() -> {
             try {
                 queueBlocked.flush();
                 flushFinished.set(true);
@@ -100,7 +104,6 @@ public class SegmentWriteQueueTest {
                 throw new UncheckedIOException(e);
             }
         });
-        flusher.start();
 
         Thread.sleep(1000);
 
@@ -130,11 +133,9 @@ public class SegmentWriteQueueTest {
         assertFalse("Queue shouldn't be empty", queue.isEmpty());
 
         semaphore.release(Integer.MAX_VALUE);
-        while (!queue.isEmpty()) {
-            Thread.sleep(10);
-        }
+        awaitUntil(() -> queue.isEmpty());
 
-        assertEquals("There should be 10 segments consumed",10, added.size());
+        assertEquals("There should be 10 segments consumed", 10, added.size());
         for (int i = 0; i < 10; i++) {
             assertTrue("Missing consumed segment", added.contains(uuid(i)));
         }
@@ -142,7 +143,7 @@ public class SegmentWriteQueueTest {
 
     @Test(timeout = 1000)
     public void testFlush() throws IOException, InterruptedException {
-        Set<UUID> added = Collections.synchronizedSet(new HashSet<>());
+        Set<UUID> added = new CopyOnWriteArraySet<>();
         Semaphore semaphore = new Semaphore(0);
         queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {
             try {
@@ -158,47 +159,49 @@ public class SegmentWriteQueueTest {
 
         AtomicBoolean flushFinished = new AtomicBoolean(false);
         Set<UUID> addedAfterFlush = new HashSet<>();
-        new Thread(() -> {
+        runInThread(() -> {
             try {
                 queue.flush();
-                flushFinished.set(true);
                 addedAfterFlush.addAll(added);
+                flushFinished.set(true);
             } catch (IOException e) {
                 throw new UncheckedIOException(e);
             }
-        }).start();
+        });
 
         Thread.sleep(100);
         assertFalse("Flush should be blocked", flushFinished.get());
 
         AtomicBoolean addFinished = new AtomicBoolean(false);
-        new Thread(() -> {
+        runInThread(() -> {
             try {
                 queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
                 addFinished.set(true);
             } catch (IOException e) {
                 throw new UncheckedIOException(e);
             }
-        }).start();
+        });
 
         Thread.sleep(100);
         assertFalse("Adding segments should be blocked until the flush is finished", addFinished.get());
 
         semaphore.release(Integer.MAX_VALUE);
 
-        while (!addFinished.get()) {
-            Thread.sleep(10);
-        }
-        assertTrue("Flush should be finished once the ", flushFinished.get());
+        awaitUntil(addFinished);
+        awaitUntil(flushFinished);
+        assertTrue("Flush should be finished", flushFinished.get());
         assertTrue("Adding segments should be blocked until the flush is finished", addFinished.get());
 
-        for (int i = 0; i < 3; i++) {
-            assertTrue(addedAfterFlush.contains(uuid(i)));
-        }
+        Set<UUID> expectedAddedAfterFlush = IntStream.range(0, 3)
+            .mapToObj(SegmentWriteQueueTest::uuid)
+            .collect(toSet());
+
+        assertTrue("Expected all values of " + expectedAddedAfterFlush + " to be in " + addedAfterFlush,
+            addedAfterFlush.containsAll(expectedAddedAfterFlush));
     }
 
     @Test(expected = IllegalStateException.class)
-    public void testClose() throws IOException, InterruptedException {
+    public void testClose() throws IOException {
         queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {});
         queue.close();
         queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
@@ -233,39 +236,33 @@ public class SegmentWriteQueueTest {
         assertEquals(9, queue.getSize()); // the 10th segment is handled by the recovery thread
 
         writeAttempts.clear();
-        while (writeAttempts.size() < 5) {
-            Thread.sleep(100);
-        }
+        awaitWhile(() -> writeAttempts.size() < 5);
         long lastAttempt = writeAttempts.get(0);
         for (int i = 1; i < 5; i++) {
             long delay = writeAttempts.get(i) - lastAttempt;
-            assertTrue("The delay between attempts to persist segment should be larger than 1s. Actual: " + delay, delay >= 1000);
+            assertTrue("The delay between attempts to persist segment should be larger than 1000ms. Actual: " + delay, delay >= 1000);
             lastAttempt = writeAttempts.get(i);
         }
 
         AtomicBoolean addFinished = new AtomicBoolean(false);
-        new Thread(() -> {
+        runInThread(() -> {
             try {
                 queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
                 addFinished.set(true);
             } catch (IOException e) {
                 throw new UncheckedIOException(e);
             }
-        }).start();
+        });
 
         Thread.sleep(100);
         assertFalse("Adding segments should be blocked until the recovery mode is finished", addFinished.get());
 
         doBreak.set(false);
-        while (queue.isBroken()) {
-            Thread.sleep(10);
-        }
+        awaitWhile(() -> queue.isBroken());
         assertFalse("Queue shouldn't be broken anymore", queue.isBroken());
 
-        while (added.size() < 11) {
-            Thread.sleep(10);
-        }
-        assertEquals("All segments should be consumed",11, added.size());
+        awaitWhile(() -> added.size() < 11);
+        assertEquals("All segments should be consumed", 11, added.size());
         for (int i = 0; i < 11; i++) {
             assertTrue("All segments should be consumed", added.contains(uuid(i)));
         }
@@ -280,7 +277,7 @@ public class SegmentWriteQueueTest {
     }
 
     @Test
-    public void testRuntimeExceptionInSegmentConsumer() throws InterruptedException, NoSuchFieldException, IOException {
+    public void testRuntimeExceptionInSegmentConsumer() throws InterruptedException, IOException {
 
         Set<UUID> added = Collections.synchronizedSet(new HashSet<>());
         AtomicBoolean doBreak = new AtomicBoolean(true);
@@ -298,7 +295,7 @@ public class SegmentWriteQueueTest {
         queue.addToQueue(tarEntry(2), EMPTY_DATA, 0, 0);
 
         AtomicBoolean flushFinished = new AtomicBoolean(false);
-        Thread flusher = new Thread(() -> {
+        runInThread(() -> {
             try {
                 queue.flush();
                 flushFinished.set(true);
@@ -306,7 +303,6 @@ public class SegmentWriteQueueTest {
                 throw new UncheckedIOException(e);
             }
         });
-        flusher.start();
 
         Thread.sleep(100);
 
@@ -331,4 +327,26 @@ public class SegmentWriteQueueTest {
         return new UUID(0, i);
     }
 
+    private void awaitUntil(AtomicBoolean condition) throws InterruptedException {
+        awaitUntil(condition::get);
+    }
+
+    private void awaitUntil(Supplier<Boolean> condition) throws InterruptedException {
+        awaitWhile(() -> !condition.get());
+    }
+
+    private void awaitWhile(Supplier<Boolean> condition) throws InterruptedException {
+        int maxAttempts = 5000;
+        int attempts = 0;
+        while (condition.get() && attempts < maxAttempts) {
+            attempts++;
+            Thread.sleep(10);
+        }
+    }
+
+    private static Thread runInThread(Runnable runnable) {
+        Thread thread = new Thread(runnable);
+        thread.start();
+        return thread;
+    }
 }