You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2018/10/29 14:53:43 UTC

svn commit: r1845135 - /jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java

Author: mduerig
Date: Mon Oct 29 14:53:43 2018
New Revision: 1845135

URL: http://svn.apache.org/viewvc?rev=1845135&view=rev
Log:
OAK-7867: Flush thread gets stuck when input stream of binaries block
@Ignored test case

Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java?rev=1845135&r1=1845134&r2=1845135&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java Mon Oct 29 14:53:43 2018
@@ -19,21 +19,34 @@
 package org.apache.jackrabbit.oak.segment.file;
 
 import static com.google.common.collect.Maps.newLinkedHashMap;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
 import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.RandomAccessFile;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.util.concurrent.Monitor;
+import com.google.common.util.concurrent.Monitor.Guard;
 import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.memory.AbstractBlob;
 import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
 import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
 import org.apache.jackrabbit.oak.segment.DefaultSegmentWriter;
@@ -44,6 +57,8 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -237,4 +252,102 @@ public class FileStoreIT {
         }
     }
 
+    @Ignore("OAK-7867")
+    @Test
+    public void blockingBlob() throws Exception {
+
+        /* A blob that blocks on read until unblocked */
+        class BlockingBlob extends AbstractBlob {
+            private final AtomicBoolean blocking = new AtomicBoolean(true);
+            private final Monitor readMonitor = new Monitor();
+            private boolean reading = false;
+
+            public boolean waitForRead(int time, TimeUnit unit) throws InterruptedException {
+                readMonitor.enter();
+                try {
+                    return readMonitor.waitFor(new Guard(readMonitor) {
+                        @Override
+                        public boolean isSatisfied() {
+                            return reading;
+                        }
+                    }, time, unit);
+                } finally {
+                    readMonitor.leave();
+                }
+            }
+
+            public void unblock() {
+                blocking.set(false);
+            }
+
+            @NotNull
+            @Override
+            public InputStream getNewStream() {
+                return new InputStream() {
+
+                    @Override
+                    public int read() throws IOException {
+                        return readOrEnd();
+                    }
+
+                    @Override
+                    public int read(@NotNull byte[] b, int off, int len) throws IOException {
+                        return readOrEnd();
+                    }
+
+                    private int readOrEnd() {
+                        if (blocking.get()) {
+                            reading = true;
+                            return 0;
+                        } else {
+                            return -1;
+                        }
+                    }
+                };
+            }
+
+            @Override
+            public long length() {
+                return 1;
+            }
+        }
+
+        ExecutorService updateExecutor = newSingleThreadExecutor();
+        ExecutorService flushExecutor = newSingleThreadExecutor();
+        try (FileStore store = fileStoreBuilder(getFileStoreFolder()).build()) {
+
+            // A blob whose stream blocks on read
+            BlockingBlob blockingBlob = new BlockingBlob();
+
+            // Use a background thread to add the blocking blob to a property
+            updateExecutor.submit(() -> {
+                SegmentNodeState root = store.getHead();
+                SegmentNodeBuilder builder = root.builder();
+                builder.setProperty("blockingBlob", blockingBlob);
+                store.getRevisions().setHead(root.getRecordId(), builder.getNodeState().getRecordId());
+            });
+
+            // Wait for reading on the blob to block
+            assertTrue(blockingBlob.waitForRead(1, SECONDS));
+
+            // In another background thread flush the file store
+            Future<Void> flushed = flushExecutor.submit(() -> {
+                store.flush();
+                return null;
+            });
+
+            // Flush should not get blocked by the blob blocked on reading
+            try {
+                flushed.get(10, SECONDS);
+            } catch (TimeoutException e) {
+                fail("Flush must not block");
+            } finally {
+                blockingBlob.unblock();
+            }
+        } finally {
+            flushExecutor.shutdown();
+            updateExecutor.shutdown();
+        }
+    }
+
 }