You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by ac...@apache.org on 2020/01/16 03:22:42 UTC
[jena] 01/01: Add semaphore to debug concurrent writes
This is an automated email from the ASF dual-hosted git repository.
acoburn pushed a commit to branch feature/debug_concurrent_writes
in repository https://gitbox.apache.org/repos/asf/jena.git
commit aa62f0508d0955406aa819d13e529296cdfd7b3b
Author: Aaron Coburn <ac...@apache.org>
AuthorDate: Wed Jan 15 22:22:00 2020 -0500
Add semaphore to debug concurrent writes
---
.../tdb2/store/nodetable/ThreadBufferingCache.java | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java
index b683671..ec64a51 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java
@@ -20,6 +20,7 @@ package org.apache.jena.tdb2.store.nodetable;
import java.util.Iterator;
import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
@@ -48,6 +49,7 @@ import org.apache.jena.tdb2.TDBException;
* The buffering is then cleared.
*/
public class ThreadBufferingCache<Key,Value> implements Cache<Key,Value> {
+ private final Semaphore scheduler = new Semaphore(1, true);
private final Cache<Key,Value> localCache;
private final Cache<Key,Value> baseCache;
private final AtomicReference<Thread> bufferingThread = new AtomicReference<>();
@@ -66,7 +68,7 @@ public class ThreadBufferingCache<Key,Value> implements Cache<Key,Value> {
if ( ! BUFFERING )
return false;
// Changes are sync'ed and the only way to change this value is via a sync'ed method.
- if ( bufferingThread == null )
+ if ( bufferingThread.get() == null )
return false;
Thread currentThread = Thread.currentThread();
return bufferingThread.get() == currentThread;
@@ -82,10 +84,15 @@ public class ThreadBufferingCache<Key,Value> implements Cache<Key,Value> {
public void enableBuffering() {
if ( ! BUFFERING )
return;
- Thread thread = Thread.currentThread();
- boolean b = bufferingThread.compareAndSet(null, thread);
- if ( !b ) {
- throw new TDBException(Lib.className(this)+": already buffering");
+ try {
+ scheduler.acquire();
+ Thread thread = Thread.currentThread();
+ boolean b = bufferingThread.compareAndSet(null, thread);
+ if ( !b ) {
+ throw new TDBException(Lib.className(this)+": already buffering");
+ }
+ } catch (final InterruptedException ex) {
+ throw new TDBException(Lib.className(this)+": already buffering, could not acquire thread");
}
}
@@ -104,6 +111,7 @@ public class ThreadBufferingCache<Key,Value> implements Cache<Key,Value> {
//System.out.println(label+": Flush:2 L: "+localCache().size());
//System.out.println(label+": Flush:2 M: "+baseCache.size());
bufferingThread.set(null);
+ scheduler.release();
}
/** Drop the local cache. */
@@ -114,6 +122,7 @@ public class ThreadBufferingCache<Key,Value> implements Cache<Key,Value> {
//System.out.println(label+": Drop: M: "+baseCache.size());
localCache().clear();
bufferingThread.set(null);
+ scheduler.release();
}
public Cache<Key, Value> getBuffer() {