You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mi...@apache.org on 2021/12/01 09:35:14 UTC
[jackrabbit-oak] branch trunk updated: OAK-8440 Fix flaky SegmentWriteQueueTest#testFlush
This is an automated email from the ASF dual-hosted git repository.
miroslav pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 96f57f6 OAK-8440 Fix flaky SegmentWriteQueueTest#testFlush
new 9125599 Merge pull request #430 from jelmini/issues/fix-segment-write-queue-test
96f57f6 is described below
commit 96f57f68444224eb4b4052bc30334857fb95f79b
Author: Carlo Jelmini <je...@adobe.com>
AuthorDate: Tue Nov 30 16:21:22 2021 +0100
OAK-8440 Fix flaky SegmentWriteQueueTest#testFlush
Remove ConcurrentModificationException by using CopyOnWriteArraySet and
ensuring progress conditions are actually met.
Misc code improvements to reduce code duplication.
---
.../remote/queue/SegmentWriteQueueTest.java | 90 +++++++++++++---------
1 file changed, 54 insertions(+), 36 deletions(-)
diff --git a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
index c7e3746..4efe2f2 100644
--- a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
+++ b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.jackrabbit.oak.segment.remote.queue;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
import org.junit.After;
import org.junit.Test;
@@ -35,6 +38,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import static java.util.stream.Collectors.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -92,7 +96,7 @@ public class SegmentWriteQueueTest {
semaphore.release(Integer.MAX_VALUE);
AtomicBoolean flushFinished = new AtomicBoolean(false);
- Thread flusher = new Thread(() -> {
+ Thread flusher = runInThread(() -> {
try {
queueBlocked.flush();
flushFinished.set(true);
@@ -100,7 +104,6 @@ public class SegmentWriteQueueTest {
throw new UncheckedIOException(e);
}
});
- flusher.start();
Thread.sleep(1000);
@@ -130,11 +133,9 @@ public class SegmentWriteQueueTest {
assertFalse("Queue shouldn't be empty", queue.isEmpty());
semaphore.release(Integer.MAX_VALUE);
- while (!queue.isEmpty()) {
- Thread.sleep(10);
- }
+ awaitUntil(() -> queue.isEmpty());
- assertEquals("There should be 10 segments consumed",10, added.size());
+ assertEquals("There should be 10 segments consumed", 10, added.size());
for (int i = 0; i < 10; i++) {
assertTrue("Missing consumed segment", added.contains(uuid(i)));
}
@@ -142,7 +143,7 @@ public class SegmentWriteQueueTest {
@Test(timeout = 1000)
public void testFlush() throws IOException, InterruptedException {
- Set<UUID> added = Collections.synchronizedSet(new HashSet<>());
+ Set<UUID> added = new CopyOnWriteArraySet<>();
Semaphore semaphore = new Semaphore(0);
queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {
try {
@@ -158,47 +159,49 @@ public class SegmentWriteQueueTest {
AtomicBoolean flushFinished = new AtomicBoolean(false);
Set<UUID> addedAfterFlush = new HashSet<>();
- new Thread(() -> {
+ runInThread(() -> {
try {
queue.flush();
- flushFinished.set(true);
addedAfterFlush.addAll(added);
+ flushFinished.set(true);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
- }).start();
+ });
Thread.sleep(100);
assertFalse("Flush should be blocked", flushFinished.get());
AtomicBoolean addFinished = new AtomicBoolean(false);
- new Thread(() -> {
+ runInThread(() -> {
try {
queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
addFinished.set(true);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
- }).start();
+ });
Thread.sleep(100);
assertFalse("Adding segments should be blocked until the flush is finished", addFinished.get());
semaphore.release(Integer.MAX_VALUE);
- while (!addFinished.get()) {
- Thread.sleep(10);
- }
- assertTrue("Flush should be finished once the ", flushFinished.get());
+ awaitUntil(addFinished);
+ awaitUntil(flushFinished);
+ assertTrue("Flush should be finished", flushFinished.get());
assertTrue("Adding segments should be blocked until the flush is finished", addFinished.get());
- for (int i = 0; i < 3; i++) {
- assertTrue(addedAfterFlush.contains(uuid(i)));
- }
+ Set<UUID> expectedAddedAfterFlush = IntStream.range(0, 3)
+ .mapToObj(SegmentWriteQueueTest::uuid)
+ .collect(toSet());
+
+ assertTrue("Expected all values of " + expectedAddedAfterFlush + " to be in " + addedAfterFlush,
+ addedAfterFlush.containsAll(expectedAddedAfterFlush));
}
@Test(expected = IllegalStateException.class)
- public void testClose() throws IOException, InterruptedException {
+ public void testClose() throws IOException {
queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {});
queue.close();
queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
@@ -233,39 +236,33 @@ public class SegmentWriteQueueTest {
assertEquals(9, queue.getSize()); // the 10th segment is handled by the recovery thread
writeAttempts.clear();
- while (writeAttempts.size() < 5) {
- Thread.sleep(100);
- }
+ awaitWhile(() -> writeAttempts.size() < 5);
long lastAttempt = writeAttempts.get(0);
for (int i = 1; i < 5; i++) {
long delay = writeAttempts.get(i) - lastAttempt;
- assertTrue("The delay between attempts to persist segment should be larger than 1s. Actual: " + delay, delay >= 1000);
+ assertTrue("The delay between attempts to persist segment should be larger than 1000ms. Actual: " + delay, delay >= 1000);
lastAttempt = writeAttempts.get(i);
}
AtomicBoolean addFinished = new AtomicBoolean(false);
- new Thread(() -> {
+ runInThread(() -> {
try {
queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
addFinished.set(true);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
- }).start();
+ });
Thread.sleep(100);
assertFalse("Adding segments should be blocked until the recovery mode is finished", addFinished.get());
doBreak.set(false);
- while (queue.isBroken()) {
- Thread.sleep(10);
- }
+ awaitWhile(() -> queue.isBroken());
assertFalse("Queue shouldn't be broken anymore", queue.isBroken());
- while (added.size() < 11) {
- Thread.sleep(10);
- }
- assertEquals("All segments should be consumed",11, added.size());
+ awaitWhile(() -> added.size() < 11);
+ assertEquals("All segments should be consumed", 11, added.size());
for (int i = 0; i < 11; i++) {
assertTrue("All segments should be consumed", added.contains(uuid(i)));
}
@@ -280,7 +277,7 @@ public class SegmentWriteQueueTest {
}
@Test
- public void testRuntimeExceptionInSegmentConsumer() throws InterruptedException, NoSuchFieldException, IOException {
+ public void testRuntimeExceptionInSegmentConsumer() throws InterruptedException, IOException {
Set<UUID> added = Collections.synchronizedSet(new HashSet<>());
AtomicBoolean doBreak = new AtomicBoolean(true);
@@ -298,7 +295,7 @@ public class SegmentWriteQueueTest {
queue.addToQueue(tarEntry(2), EMPTY_DATA, 0, 0);
AtomicBoolean flushFinished = new AtomicBoolean(false);
- Thread flusher = new Thread(() -> {
+ runInThread(() -> {
try {
queue.flush();
flushFinished.set(true);
@@ -306,7 +303,6 @@ public class SegmentWriteQueueTest {
throw new UncheckedIOException(e);
}
});
- flusher.start();
Thread.sleep(100);
@@ -331,4 +327,26 @@ public class SegmentWriteQueueTest {
return new UUID(0, i);
}
+ private void awaitUntil(AtomicBoolean condition) throws InterruptedException {
+ awaitUntil(condition::get);
+ }
+
+ private void awaitUntil(Supplier<Boolean> condition) throws InterruptedException {
+ awaitWhile(() -> !condition.get());
+ }
+
+ private void awaitWhile(Supplier<Boolean> condition) throws InterruptedException {
+ int maxAttempts = 5000;
+ int attempts = 0;
+ while (condition.get() && attempts < maxAttempts) {
+ attempts++;
+ Thread.sleep(10);
+ }
+ }
+
+ private static Thread runInThread(Runnable runnable) {
+ Thread thread = new Thread(runnable);
+ thread.start();
+ return thread;
+ }
}