You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2018/11/01 08:26:51 UTC
svn commit: r1845405 - in /jackrabbit/oak/trunk/oak-segment-tar/src:
main/java/org/apache/jackrabbit/oak/segment/
test/java/org/apache/jackrabbit/oak/segment/
test/java/org/apache/jackrabbit/oak/segment/file/
Author: mduerig
Date: Thu Nov 1 08:26:51 2018
New Revision: 1845405
URL: http://svn.apache.org/viewvc?rev=1845405&view=rev
Log:
OAK-7867: Flush thread gets stuck when input stream of binaries block
Push write operations into the segment writer
Generalise handling of GCGeneration
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriter.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriter.java?rev=1845405&r1=1845404&r2=1845405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriter.java Thu Nov 1 08:26:51 2018
@@ -65,6 +65,7 @@ import org.apache.jackrabbit.oak.api.Blo
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.memory.ModifiedNodeState;
+import org.apache.jackrabbit.oak.segment.RecordWriters.RecordWriter;
import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation;
import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
@@ -140,107 +141,53 @@ public class DefaultSegmentWriter implem
@NotNull
RecordId writeMap(@Nullable final MapRecord base, @NotNull final Map<String, RecordId> changes) throws IOException {
- return writeOperationHandler.execute(new SegmentWriteOperation() {
-
- @NotNull
- @Override
- public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
- return with(writer).writeMap(base, changes);
- }
-
- });
+ return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+ .writeMap(base, changes);
}
@NotNull
RecordId writeList(@NotNull final List<RecordId> list) throws IOException {
- return writeOperationHandler.execute(new SegmentWriteOperation() {
-
- @NotNull
- @Override
- public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
- return with(writer).writeList(list);
- }
-
- });
+ return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+ .writeList(list);
}
@NotNull
RecordId writeString(@NotNull final String string) throws IOException {
- return writeOperationHandler.execute(new SegmentWriteOperation() {
-
- @NotNull
- @Override
- public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
- return with(writer).writeString(string);
- }
-
- });
+ return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+ .writeString(string);
}
@Override
@NotNull
public RecordId writeBlob(@NotNull final Blob blob) throws IOException {
- return writeOperationHandler.execute(new SegmentWriteOperation() {
-
- @NotNull
- @Override
- public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
- return with(writer).writeBlob(blob);
- }
-
- });
+ return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+ .writeBlob(blob);
}
@NotNull
RecordId writeBlock(@NotNull final byte[] bytes, final int offset, final int length) throws IOException {
- return writeOperationHandler.execute(new SegmentWriteOperation() {
-
- @NotNull
- @Override
- public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
- return with(writer).writeBlock(bytes, offset, length);
- }
-
- });
+ return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+ .writeBlock(bytes, offset, length);
}
@Override
@NotNull
public RecordId writeStream(@NotNull final InputStream stream) throws IOException {
- return writeOperationHandler.execute(new SegmentWriteOperation() {
-
- @NotNull
- @Override
- public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
- return with(writer).writeStream(stream);
- }
- });
+ return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+ .writeStream(stream);
}
@NotNull
RecordId writeProperty(@NotNull final PropertyState state) throws IOException {
- return writeOperationHandler.execute(new SegmentWriteOperation() {
-
- @NotNull
- @Override
- public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
- return with(writer).writeProperty(state);
- }
- });
+ return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+ .writeProperty(state);
}
@Override
@NotNull
public RecordId writeNode(@NotNull final NodeState state, @Nullable final ByteBuffer stableIdBytes) throws IOException {
- return writeOperationHandler.execute(new SegmentWriteOperation() {
-
- @NotNull
- @Override
- public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
- return with(writer).writeNode(state, stableIdBytes);
- }
-
- });
+ return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+ .writeNode(state, stableIdBytes);
}
/**
@@ -249,28 +196,25 @@ public class DefaultSegmentWriter implem
* as a separate argument (a poor mans monad). As such it is entirely
* <em>not thread safe</em>.
*/
- private abstract class SegmentWriteOperation implements WriteOperation {
- private SegmentBufferWriter writer;
-
- private Cache<String, RecordId> stringCache;
+ private class SegmentWriteOperation {
+ private final GCGeneration gcGeneration;
- private Cache<Template, RecordId> templateCache;
+ private final Cache<String, RecordId> stringCache;
- private Cache<String, RecordId> nodeCache;
+ private final Cache<Template, RecordId> templateCache;
- @NotNull
- @Override
- public abstract RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException;
+ private final Cache<String, RecordId> nodeCache;
- @NotNull
- SegmentWriteOperation with(@NotNull SegmentBufferWriter writer) {
- checkState(this.writer == null);
- this.writer = writer;
- int generation = writer.getGCGeneration().getGeneration();
+ SegmentWriteOperation(@NotNull GCGeneration gcGeneration) {
+ int generation = gcGeneration.getGeneration();
+ this.gcGeneration = gcGeneration;
this.stringCache = cacheManager.getStringCache(generation);
this.templateCache = cacheManager.getTemplateCache(generation);
this.nodeCache = cacheManager.getNodeCache(generation);
- return this;
+ }
+
+ private WriteOperation newWriteOperation(RecordWriter recordWriter) {
+ return writer -> recordWriter.write(writer, store);
}
private RecordId writeMap(@Nullable MapRecord base,
@@ -297,8 +241,10 @@ public class DefaultSegmentWriter implem
if (value.equals(entry.getValue())) {
return base.getRecordId();
} else {
- return RecordWriters.newMapBranchWriter(entry.getHash(), asList(entry.getKey(),
- value, base.getRecordId())).write(writer, store);
+ return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+ RecordWriters.newMapBranchWriter(
+ entry.getHash(),
+ asList(entry.getKey(), value, base.getRecordId()))));
}
}
}
@@ -332,7 +278,8 @@ public class DefaultSegmentWriter implem
checkElementIndex(size, MapRecord.MAX_SIZE);
checkPositionIndex(level, MapRecord.MAX_NUMBER_OF_LEVELS);
checkArgument(size != 0 || level == MapRecord.MAX_NUMBER_OF_LEVELS);
- return RecordWriters.newMapLeafWriter(level, entries).write(writer, store);
+ return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+ RecordWriters.newMapLeafWriter(level, entries)));
}
private RecordId writeMapBranch(int level, int size, MapRecord... buckets) throws IOException {
@@ -344,7 +291,8 @@ public class DefaultSegmentWriter implem
bucketIds.add(buckets[i].getRecordId());
}
}
- return RecordWriters.newMapBranchWriter(level, size, bitmap, bucketIds).write(writer, store);
+ return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+ RecordWriters.newMapBranchWriter(level, size, bitmap, bucketIds)));
}
private RecordId writeMapBucket(MapRecord base, Collection<MapEntry> entries, int level)
@@ -354,7 +302,8 @@ public class DefaultSegmentWriter implem
if (base != null) {
return base.getRecordId();
} else if (level == 0) {
- return RecordWriters.newMapLeafWriter().write(writer, store);
+ return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+ RecordWriters.newMapLeafWriter()));
} else {
return null;
}
@@ -463,7 +412,8 @@ public class DefaultSegmentWriter implem
private RecordId writeListBucket(List<RecordId> bucket) throws IOException {
checkArgument(bucket.size() > 1);
- return RecordWriters.newListBucketWriter(bucket).write(writer, store);
+ return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+ RecordWriters.newListBucketWriter(bucket)));
}
private List<List<MapEntry>> splitToBuckets(Collection<MapEntry> entries, int level) {
@@ -486,12 +436,14 @@ public class DefaultSegmentWriter implem
private RecordId writeValueRecord(long length, RecordId blocks) throws IOException {
long len = (length - Segment.MEDIUM_LIMIT) | (0x3L << 62);
- return RecordWriters.newValueWriter(blocks, len).write(writer, store);
+ return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+ RecordWriters.newValueWriter(blocks, len)));
}
private RecordId writeValueRecord(int length, byte... data) throws IOException {
checkArgument(length < Segment.MEDIUM_LIMIT);
- return RecordWriters.newValueWriter(length, data).write(writer, store);
+ return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+ RecordWriters.newValueWriter(length, data)));
}
/**
@@ -589,23 +541,22 @@ public class DefaultSegmentWriter implem
private RecordId writeBlobId(String blobId) throws IOException {
byte[] data = blobId.getBytes(UTF_8);
- RecordId recordId;
-
if (data.length < Segment.BLOB_ID_SMALL_LIMIT) {
- recordId = RecordWriters.newBlobIdWriter(data).write(writer, store);
+ return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+ RecordWriters.newBlobIdWriter(data)));
} else {
RecordId refId = writeString(blobId);
- recordId = RecordWriters.newBlobIdWriter(refId).write(writer, store);
+ return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+ RecordWriters.newBlobIdWriter(refId)));
}
-
- return recordId;
}
private RecordId writeBlock(@NotNull byte[] bytes, int offset, int length)
throws IOException {
checkNotNull(bytes);
checkPositionIndexes(offset, offset + length, bytes.length);
- return RecordWriters.newBlockWriter(bytes, offset, length).write(writer, store);
+ return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+ RecordWriters.newBlockWriter(bytes, offset, length)));
}
private RecordId writeStream(@NotNull InputStream stream) throws IOException {
@@ -705,9 +656,12 @@ public class DefaultSegmentWriter implem
if (!type.isArray()) {
return valueIds.iterator().next();
} else if (count == 0) {
- return RecordWriters.newListWriter().write(writer, store);
+ return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+ RecordWriters.newListWriter()));
} else {
- return RecordWriters.newListWriter(count, writeList(valueIds)).write(writer, store);
+ RecordId lid = writeList(valueIds);
+ return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+ RecordWriters.newListWriter(count, lid)));
}
}
@@ -778,9 +732,10 @@ public class DefaultSegmentWriter implem
checkState(propertyNames.length < (1 << 18));
head |= propertyNames.length;
- RecordId tid = RecordWriters.newTemplateWriter(ids, propertyNames,
- propertyTypes, head, primaryId, mixinIds, childNameId,
- propNamesId).write(writer, store);
+ RecordId tid = writeOperationHandler.execute(gcGeneration, newWriteOperation(
+ RecordWriters.newTemplateWriter(
+ ids, propertyNames, propertyTypes, head, primaryId, mixinIds,
+ childNameId, propNamesId)));
templateCache.put(template, tid);
return tid;
}
@@ -916,14 +871,18 @@ public class DefaultSegmentWriter implem
ids.add(writeList(pIds));
}
- RecordId stableId = null;
+ RecordId stableId;
if (stableIdBytes != null) {
ByteBuffer buffer = stableIdBytes.duplicate();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
stableId = writeBlock(bytes, 0, bytes.length);
+ } else {
+ stableId = null;
}
- return newNodeStateWriter(stableId, ids).write(writer, store);
+
+ return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+ newNodeStateWriter(stableId, ids)));
}
/**
@@ -983,7 +942,7 @@ public class DefaultSegmentWriter implem
private boolean isOldGeneration(RecordId id) {
try {
GCGeneration thatGen = id.getSegmentId().getGcGeneration();
- GCGeneration thisGen = writer.getGCGeneration();
+ GCGeneration thisGen = gcGeneration;
if (thatGen.isCompacted()) {
// If the segment containing the base state is compacted it is
// only considered old if it is from a earlier full generation.
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java?rev=1845405&r1=1845404&r2=1845405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java Thu Nov 1 08:26:51 2018
@@ -169,9 +169,17 @@ public final class DefaultSegmentWriterB
store.getBlobStore(),
cacheManager,
new WriteOperationHandler() {
+
+ @Override
+ @NotNull
+ public GCGeneration getGCGeneration() {
+ throw new UnsupportedOperationException("Cannot write to read-only store");
+ }
+
@NotNull
@Override
- public RecordId execute(@NotNull WriteOperation writeOperation) {
+ public RecordId execute(@NotNull GCGeneration gcGeneration,
+ @NotNull WriteOperation writeOperation) {
throw new UnsupportedOperationException("Cannot write to read-only store");
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java?rev=1845405&r1=1845404&r2=1845405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java Thu Nov 1 08:26:51 2018
@@ -156,12 +156,16 @@ public class SegmentBufferWriter impleme
@NotNull
@Override
- public RecordId execute(@NotNull WriteOperation writeOperation) throws IOException {
+ public RecordId execute(@NotNull GCGeneration gcGeneration,
+ @NotNull WriteOperation writeOperation)
+ throws IOException {
+ checkState(gcGeneration.equals(this.gcGeneration));
return writeOperation.execute(this);
}
+ @Override
@NotNull
- GCGeneration getGCGeneration() {
+ public GCGeneration getGCGeneration() {
return gcGeneration;
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java?rev=1845405&r1=1845404&r2=1845405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java Thu Nov 1 08:26:51 2018
@@ -27,6 +27,7 @@ import static com.google.common.collect.
import static java.lang.Thread.currentThread;
import java.io.IOException;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -39,7 +40,7 @@ import org.jetbrains.annotations.NotNull
/**
* This {@link WriteOperationHandler} uses a pool of {@link SegmentBufferWriter}s,
- * which it passes to its {@link #execute(WriteOperation) execute} method.
+ * which it passes to its {@link #execute(GCGeneration, WriteOperation) execute} method.
* <p>
* Instances of this class are thread safe.
*/
@@ -67,11 +68,6 @@ public class SegmentBufferWriterPool imp
*/
private final Set<SegmentBufferWriter> disposed = newHashSet();
- /**
- * Retired writers that have not yet been flushed from a previous GC generation
- */
- private final Set<SegmentBufferWriter> disposedOldGen = newHashSet();
-
@NotNull
private final SegmentIdProvider idProvider;
@@ -97,14 +93,23 @@ public class SegmentBufferWriterPool imp
this.gcGeneration = checkNotNull(gcGeneration);
}
+ @Override
+ @NotNull
+ public GCGeneration getGCGeneration() {
+ return gcGeneration.get();
+ }
+
@NotNull
@Override
- public RecordId execute(@NotNull WriteOperation writeOperation) throws IOException {
- SegmentBufferWriter writer = borrowWriter(currentThread());
+ public RecordId execute(@NotNull GCGeneration gcGeneration,
+ @NotNull WriteOperation writeOperation)
+ throws IOException {
+ SimpleImmutableEntry<?,?> key = new SimpleImmutableEntry<>(currentThread(), gcGeneration);
+ SegmentBufferWriter writer = borrowWriter(key);
try {
return writeOperation.execute(writer);
} finally {
- returnWriter(currentThread(), writer);
+ returnWriter(key, writer);
}
}
@@ -120,11 +125,6 @@ public class SegmentBufferWriterPool imp
toFlush.addAll(writers.values());
writers.clear();
- // Collect all writers from old GC generations that
- // have been disposed
- toFlush.addAll(disposedOldGen);
- disposedOldGen.clear();
-
// Collect all borrowed writers, which we need to wait for.
// Clear the list so they will get disposed once returned.
toReturn.addAll(borrowed);
@@ -179,7 +179,7 @@ public class SegmentBufferWriterPool imp
monitor.enterWhen(guard);
return true;
} catch (InterruptedException ignore) {
- Thread.currentThread().interrupt();
+ currentThread().interrupt();
return false;
}
}
@@ -197,14 +197,6 @@ public class SegmentBufferWriterPool imp
writer = new SegmentBufferWriter(
idProvider,
reader,
- getWriterId(wid),
- gcGeneration.get()
- );
- } else if (!writer.getGCGeneration().equals(gcGeneration.get())) {
- disposedOldGen.add(writer);
- writer = new SegmentBufferWriter(
- idProvider,
- reader,
getWriterId(wid),
gcGeneration.get()
);
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java?rev=1845405&r1=1845404&r2=1845405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java Thu Nov 1 08:26:51 2018
@@ -23,6 +23,8 @@ import java.io.IOException;
import org.jetbrains.annotations.NotNull;
+import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
+
/**
* A {@code WriteOperationHandler} executes {@link WriteOperation
* WriteOperation}s and as such serves as a bridge between a {@link
@@ -31,6 +33,12 @@ import org.jetbrains.annotations.NotNull
interface WriteOperationHandler {
/**
+ * @return the current {@code GCGeneration} of the store.
+ */
+ @NotNull
+ GCGeneration getGCGeneration();
+
+ /**
* A {@code WriteOperation} encapsulates an operation on a {@link
* SegmentWriter}. Executing it performs the actual act of persisting
* changes to a {@link SegmentBufferWriter}.
@@ -50,12 +58,14 @@ interface WriteOperationHandler {
/**
* Execute the passed {@code writeOperation} by passing it a {@link SegmentBufferWriter}.
+ * @param gcGeneration the {@code GCGeneration} the changes should persisted with.
* @param writeOperation {@link WriteOperation} to execute
* @return {@code RecordId} that resulted from persisting the changes.
* @throws IOException
*/
@NotNull
- RecordId execute(@NotNull WriteOperation writeOperation) throws IOException;
+ RecordId execute(@NotNull GCGeneration gcGeneration, @NotNull WriteOperation writeOperation)
+ throws IOException;
/**
* Flush any pending changes on any {@link SegmentBufferWriter} managed by this instance.
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java?rev=1845405&r1=1845404&r2=1845405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java Thu Nov 1 08:26:51 2018
@@ -36,7 +36,6 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
-import com.google.common.base.Suppliers;
import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation;
import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
@@ -50,11 +49,13 @@ public class SegmentBufferWriterPoolTest
private final RecordId rootId = store.getRevisions().getHead();
+ private GCGeneration gcGeneration = GCGeneration.NULL;
+
private final SegmentBufferWriterPool pool = new SegmentBufferWriterPool(
store.getSegmentIdProvider(),
store.getReader(),
"",
- Suppliers.ofInstance(GCGeneration.NULL)
+ () -> gcGeneration
);
private final ExecutorService[] executors = new ExecutorService[] {
@@ -69,11 +70,11 @@ public class SegmentBufferWriterPoolTest
}
}
- private Future<RecordId> execute(final WriteOperation op, int executor) {
+ private Future<RecordId> execute(GCGeneration gcGeneration, final WriteOperation op, int executor) {
return executors[executor].submit(new Callable<RecordId>() {
@Override
public RecordId call() throws Exception {
- return pool.execute(op);
+ return pool.execute(gcGeneration, op);
}
});
}
@@ -90,10 +91,11 @@ public class SegmentBufferWriterPoolTest
@Test
public void testThreadAffinity() throws IOException, ExecutionException, InterruptedException {
+ GCGeneration gen = pool.getGCGeneration();
ConcurrentMap<String, SegmentBufferWriter> map1 = newConcurrentMap();
- Future<RecordId> res1 = execute(createOp("a", map1), 0);
- Future<RecordId> res2 = execute(createOp("b", map1), 1);
- Future<RecordId> res3 = execute(createOp("c", map1), 2);
+ Future<RecordId> res1 = execute(gen, createOp("a", map1), 0);
+ Future<RecordId> res2 = execute(gen, createOp("b", map1), 1);
+ Future<RecordId> res3 = execute(gen, createOp("c", map1), 2);
// Give the tasks some time to complete
sleepUninterruptibly(10, MILLISECONDS);
@@ -104,9 +106,9 @@ public class SegmentBufferWriterPoolTest
assertEquals(3, map1.size());
ConcurrentMap<String, SegmentBufferWriter> map2 = newConcurrentMap();
- Future<RecordId> res4 = execute(createOp("a", map2), 0);
- Future<RecordId> res5 = execute(createOp("b", map2), 1);
- Future<RecordId> res6 = execute(createOp("c", map2), 2);
+ Future<RecordId> res4 = execute(gen, createOp("a", map2), 0);
+ Future<RecordId> res5 = execute(gen, createOp("b", map2), 1);
+ Future<RecordId> res6 = execute(gen, createOp("c", map2), 2);
// Give the tasks some time to complete
sleepUninterruptibly(10, MILLISECONDS);
@@ -120,10 +122,11 @@ public class SegmentBufferWriterPoolTest
@Test
public void testFlush() throws ExecutionException, InterruptedException, IOException {
+ GCGeneration gen = pool.getGCGeneration();
ConcurrentMap<String, SegmentBufferWriter> map1 = newConcurrentMap();
- Future<RecordId> res1 = execute(createOp("a", map1), 0);
- Future<RecordId> res2 = execute(createOp("b", map1), 1);
- Future<RecordId> res3 = execute(createOp("c", map1), 2);
+ Future<RecordId> res1 = execute(gen, createOp("a", map1), 0);
+ Future<RecordId> res2 = execute(gen, createOp("b", map1), 1);
+ Future<RecordId> res3 = execute(gen, createOp("c", map1), 2);
// Give the tasks some time to complete
sleepUninterruptibly(10, MILLISECONDS);
@@ -136,9 +139,9 @@ public class SegmentBufferWriterPoolTest
pool.flush(store);
ConcurrentMap<String, SegmentBufferWriter> map2 = newConcurrentMap();
- Future<RecordId> res4 = execute(createOp("a", map2), 0);
- Future<RecordId> res5 = execute(createOp("b", map2), 1);
- Future<RecordId> res6 = execute(createOp("c", map2), 2);
+ Future<RecordId> res4 = execute(gen, createOp("a", map2), 0);
+ Future<RecordId> res5 = execute(gen, createOp("b", map2), 1);
+ Future<RecordId> res6 = execute(gen, createOp("c", map2), 2);
// Give the tasks some time to complete
sleepUninterruptibly(10, MILLISECONDS);
@@ -151,10 +154,61 @@ public class SegmentBufferWriterPoolTest
}
@Test
+ public void testCompaction() throws ExecutionException, InterruptedException, IOException {
+ GCGeneration gen = pool.getGCGeneration();
+ ConcurrentMap<String, SegmentBufferWriter> map1 = newConcurrentMap();
+ Future<RecordId> res1 = execute(gen, createOp("a", map1), 0);
+ Future<RecordId> res2 = execute(gen, createOp("b", map1), 1);
+ Future<RecordId> res3 = execute(gen, createOp("c", map1), 2);
+
+ // Give the tasks some time to complete
+ sleepUninterruptibly(10, MILLISECONDS);
+
+ assertEquals(rootId, res1.get());
+ assertEquals(rootId, res2.get());
+ assertEquals(rootId, res3.get());
+ assertEquals(3, map1.size());
+
+ // Simulate compaction by increasing the global gc generation
+ gcGeneration = gcGeneration.nextFull();
+
+ // Write using previous generation
+ ConcurrentMap<String, SegmentBufferWriter> map2 = newConcurrentMap();
+ Future<RecordId> res4 = execute(gen, createOp("a", map2), 0);
+ Future<RecordId> res5 = execute(gen, createOp("b", map2), 1);
+ Future<RecordId> res6 = execute(gen, createOp("c", map2), 2);
+
+ // Give the tasks some time to complete
+ sleepUninterruptibly(10, MILLISECONDS);
+
+ assertEquals(rootId, res4.get());
+ assertEquals(rootId, res5.get());
+ assertEquals(rootId, res6.get());
+ assertEquals(3, map2.size());
+ assertEquals(map1, map2);
+
+ // Write using current generation
+ ConcurrentMap<String, SegmentBufferWriter> map3 = newConcurrentMap();
+ Future<RecordId> res7 = execute(gen.nextFull(), createOp("a", map3), 0);
+ Future<RecordId> res8 = execute(gen.nextFull(), createOp("b", map3), 1);
+ Future<RecordId> res9 = execute(gen.nextFull(), createOp("c", map3), 2);
+
+ // Give the tasks some time to complete
+ sleepUninterruptibly(10, MILLISECONDS);
+
+ assertEquals(rootId, res7.get());
+ assertEquals(rootId, res8.get());
+ assertEquals(rootId, res9.get());
+ assertEquals(3, map3.size());
+ assertTrue(intersection(newHashSet(map1.values()), newHashSet(map3.values())).isEmpty());
+ }
+
+ @Test
public void testFlushBlocks() throws ExecutionException, InterruptedException {
- Future<RecordId> res = execute(new WriteOperation() {
- @NotNull
- @Nullable @Override
+ GCGeneration gcGeneration = pool.getGCGeneration();
+ Future<RecordId> res = execute(gcGeneration, new WriteOperation() {
+ @Nullable
+ @Override
public RecordId execute(@NotNull SegmentBufferWriter writer) {
try {
// This should deadlock as flush waits for this write
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java?rev=1845405&r1=1845404&r2=1845405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java Thu Nov 1 08:26:51 2018
@@ -58,7 +58,6 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.jetbrains.annotations.NotNull;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -252,7 +251,6 @@ public class FileStoreIT {
}
}
- @Ignore("OAK-7867")
@Test
public void blockingBlob() throws Exception {