You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2018/10/29 14:53:43 UTC
svn commit: r1845135 -
/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java
Author: mduerig
Date: Mon Oct 29 14:53:43 2018
New Revision: 1845135
URL: http://svn.apache.org/viewvc?rev=1845135&view=rev
Log:
OAK-7867: Flush thread gets stuck when input stream of binaries block
@Ignored test case
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java?rev=1845135&r1=1845134&r2=1845135&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java Mon Oct 29 14:53:43 2018
@@ -19,21 +19,34 @@
package org.apache.jackrabbit.oak.segment.file;
import static com.google.common.collect.Maps.newLinkedHashMap;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.util.concurrent.Monitor;
+import com.google.common.util.concurrent.Monitor.Guard;
import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.memory.AbstractBlob;
import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
import org.apache.jackrabbit.oak.segment.DefaultSegmentWriter;
@@ -44,6 +57,8 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -237,4 +252,102 @@ public class FileStoreIT {
}
}
+ @Ignore("OAK-7867")
+ @Test
+ public void blockingBlob() throws Exception {
+
+ /* A blob that blocks on read until unblocked */
+ class BlockingBlob extends AbstractBlob {
+ private final AtomicBoolean blocking = new AtomicBoolean(true);
+ private final Monitor readMonitor = new Monitor();
+ private boolean reading = false;
+
+ public boolean waitForRead(int time, TimeUnit unit) throws InterruptedException {
+ readMonitor.enter();
+ try {
+ return readMonitor.waitFor(new Guard(readMonitor) {
+ @Override
+ public boolean isSatisfied() {
+ return reading;
+ }
+ }, time, unit);
+ } finally {
+ readMonitor.leave();
+ }
+ }
+
+ public void unblock() {
+ blocking.set(false);
+ }
+
+ @NotNull
+ @Override
+ public InputStream getNewStream() {
+ return new InputStream() {
+
+ @Override
+ public int read() throws IOException {
+ return readOrEnd();
+ }
+
+ @Override
+ public int read(@NotNull byte[] b, int off, int len) throws IOException {
+ return readOrEnd();
+ }
+
+ private int readOrEnd() {
+ if (blocking.get()) {
+ reading = true;
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+ };
+ }
+
+ @Override
+ public long length() {
+ return 1;
+ }
+ }
+
+ ExecutorService updateExecutor = newSingleThreadExecutor();
+ ExecutorService flushExecutor = newSingleThreadExecutor();
+ try (FileStore store = fileStoreBuilder(getFileStoreFolder()).build()) {
+
+ // A blob whose stream blocks on read
+ BlockingBlob blockingBlob = new BlockingBlob();
+
+ // Use a background thread to add the blocking blob to a property
+ updateExecutor.submit(() -> {
+ SegmentNodeState root = store.getHead();
+ SegmentNodeBuilder builder = root.builder();
+ builder.setProperty("blockingBlob", blockingBlob);
+ store.getRevisions().setHead(root.getRecordId(), builder.getNodeState().getRecordId());
+ });
+
+ // Wait for reading on the blob to block
+ assertTrue(blockingBlob.waitForRead(1, SECONDS));
+
+ // In another background thread flush the file store
+ Future<Void> flushed = flushExecutor.submit(() -> {
+ store.flush();
+ return null;
+ });
+
+ // Flush should not get blocked by the blob blocked on reading
+ try {
+ flushed.get(10, SECONDS);
+ } catch (TimeoutException e) {
+ fail("Flush must not block");
+ } finally {
+ blockingBlob.unblock();
+ }
+ } finally {
+ flushExecutor.shutdown();
+ updateExecutor.shutdown();
+ }
+ }
+
}