You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by ac...@apache.org on 2020/01/16 03:22:42 UTC

[jena] 01/01: Add semaphore to debug concurrent writes

This is an automated email from the ASF dual-hosted git repository.

acoburn pushed a commit to branch feature/debug_concurrent_writes
in repository https://gitbox.apache.org/repos/asf/jena.git

commit aa62f0508d0955406aa819d13e529296cdfd7b3b
Author: Aaron Coburn <ac...@apache.org>
AuthorDate: Wed Jan 15 22:22:00 2020 -0500

    Add semaphore to debug concurrent writes
---
 .../tdb2/store/nodetable/ThreadBufferingCache.java    | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java
index b683671..ec64a51 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java
@@ -20,6 +20,7 @@ package org.apache.jena.tdb2.store.nodetable;
 
 import java.util.Iterator;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 
@@ -48,6 +49,7 @@ import org.apache.jena.tdb2.TDBException;
  * The buffering is then cleared.
  */
 public class ThreadBufferingCache<Key,Value> implements Cache<Key,Value> {
+    private final Semaphore scheduler = new Semaphore(1, true);
     private final Cache<Key,Value> localCache;
     private final Cache<Key,Value> baseCache;
     private final AtomicReference<Thread> bufferingThread = new AtomicReference<>();
@@ -66,7 +68,7 @@ public class ThreadBufferingCache<Key,Value> implements Cache<Key,Value> {
         if ( ! BUFFERING )
             return false;
         // Changes are sync'ed and the only way to change this value is via a sync'ed method.
-        if ( bufferingThread == null )
+        if ( bufferingThread.get() == null )
             return false;
         Thread currentThread = Thread.currentThread();
         return bufferingThread.get() == currentThread;
@@ -82,10 +84,15 @@ public class ThreadBufferingCache<Key,Value> implements Cache<Key,Value> {
     public void enableBuffering() {
         if ( ! BUFFERING )
             return;
-        Thread thread = Thread.currentThread();
-        boolean b = bufferingThread.compareAndSet(null, thread);
-        if ( !b ) {
-            throw new TDBException(Lib.className(this)+": already buffering");
+        try {
+            scheduler.acquire();
+            Thread thread = Thread.currentThread();
+            boolean b = bufferingThread.compareAndSet(null, thread);
+            if ( !b ) {
+                throw new TDBException(Lib.className(this)+": already buffering");
+            }
+        } catch (final InterruptedException ex) {
+            throw new TDBException(Lib.className(this)+": already buffering, could not acquire thread");
         }
     }
 
@@ -104,6 +111,7 @@ public class ThreadBufferingCache<Key,Value> implements Cache<Key,Value> {
         //System.out.println(label+": Flush:2 L: "+localCache().size());
         //System.out.println(label+": Flush:2 M: "+baseCache.size());
         bufferingThread.set(null);
+        scheduler.release();
     }
 
     /** Drop the local cache. */
@@ -114,6 +122,7 @@ public class ThreadBufferingCache<Key,Value> implements Cache<Key,Value> {
         //System.out.println(label+": Drop: M: "+baseCache.size());
         localCache().clear();
         bufferingThread.set(null);
+        scheduler.release();
     }
 
     public Cache<Key, Value> getBuffer() {