You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2016/08/17 16:01:42 UTC

[09/11] jena git commit: Tests for active writer when promote happens

Tests for active writer when promote happens


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/f972dde6
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/f972dde6
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/f972dde6

Branch: refs/heads/master
Commit: f972dde69beccefad2e01cf97cc7708917a309ab
Parents: f0d389d
Author: Andy Seaborne <an...@apache.org>
Authored: Sun Aug 14 20:54:52 2016 +0100
Committer: Andy Seaborne <an...@apache.org>
Committed: Sun Aug 14 20:54:52 2016 +0100

----------------------------------------------------------------------
 .../jena/tdb/transaction/TestTransPromote.java  | 99 +++++++++++++++++++-
 1 file changed, 96 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/f972dde6/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
----------------------------------------------------------------------
diff --git a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
index 2ae1ad1..43bc02a 100644
--- a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
+++ b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
@@ -21,10 +21,11 @@ package org.apache.jena.tdb.transaction ;
 import static org.junit.Assert.assertEquals ;
 import static org.junit.Assert.fail ;
 
-import java.util.concurrent.Semaphore ;
+import java.util.concurrent.* ;
 import java.util.concurrent.atomic.AtomicInteger ;
 
 import org.apache.jena.atlas.iterator.Iter ;
+import org.apache.jena.atlas.lib.Lib ;
 import org.apache.jena.query.ReadWrite ;
 import org.apache.jena.sparql.core.DatasetGraph ;
 import org.apache.jena.sparql.core.Quad ;
@@ -301,9 +302,10 @@ public class TestTransPromote {
     
     // Test whether an active writer causes a snapshot isolation
     // promotion to fail like it should.
-    // No equivalent read commiter version - it would block. 
+    // The promtion is on the test's thread.
+    // No equivalent "read commited" version - it would block. 
     @Test(expected=TDBTransactionException.class)
-    public void promote_clash_1() { 
+    public void promote_active_writer_0() { 
         DatasetGraphTransaction.readCommittedPromotion = false ;
         DatasetGraph dsg = create() ;
         Semaphore sema1 = new Semaphore(0) ;
@@ -326,7 +328,98 @@ public class TestTransPromote {
         // If snapshot, this will cause an exception due to the active writer.
         dsg.add(q1) ;
         fail("Should not be here") ;
+        sema2.release(1);
         dsg.commit() ;
         dsg.end() ;
     }
+    
+    @Test(expected=TDBTransactionException.class)
+    public void promote_active_writer_1() throws InterruptedException, ExecutionException {
+        // Active writer commits -> no promotion.
+        promote_active_writer(true) ;
+    }
+    
+    // Current implementation in TDB - the active writer causes "no promotion" even though it will abort.   
+    @Test(expected=TDBTransactionException.class)
+    public void promote_active_writer_2() throws InterruptedException, ExecutionException {
+        // Active writer aborts -> promotion possible (but not ipleemtned that way).
+        promote_active_writer(false) ;
+    }
+    private void promote_active_writer(boolean activeWriterCommit) {
+        ExecutorService executor = Executors.newFixedThreadPool(2) ;
+        try {
+            promote_clash_active_writer(executor, false) ;
+        } finally {
+            executor.shutdown();
+        }
+    }
+    
+    private void promote_clash_active_writer(ExecutorService executor, boolean activeWriterCommit) {
+        DatasetGraphTransaction.readCommittedPromotion = false ;
+        Semaphore semaActiveWriterStart     = new Semaphore(0) ;
+        Semaphore semaActiveWriterContinue  = new Semaphore(0) ;
+        Semaphore semaPromoteTxnStart       = new Semaphore(0) ;
+        Semaphore semaPromoteTxnContinue    = new Semaphore(0) ;
+
+        DatasetGraph dsg = create() ;
+
+        // The "active writer". 
+        Callable<Object> activeWriter = ()->{
+            dsg.begin(ReadWrite.WRITE) ;
+            semaActiveWriterStart.release(1) ;
+            // (*1)
+            semaActiveWriterContinue.acquireUninterruptibly(1) ;
+            if ( activeWriterCommit )
+                dsg.commit() ;
+            else
+                dsg.abort();
+            dsg.end() ;
+            return null ;
+        } ;
+
+        Future<Object> activeWriterFuture = executor.submit(activeWriter) ;
+        // Advance "active writer" to (*1), inside a write transaction and waiting.
+        // The transaction has been created and started.
+        semaActiveWriterStart.acquireUninterruptibly(); 
+
+        Callable<TDBTransactionException> attemptedPromote = ()->{
+            dsg.begin(ReadWrite.READ) ;
+            semaPromoteTxnStart.release(1) ;
+            // (*2)
+            semaPromoteTxnContinue.acquireUninterruptibly();
+            try { 
+                // (*3)
+                dsg.add(q1) ;
+                //System.err.println("PROMOTED");
+                return null ;
+            } catch (TDBTransactionException e) {
+                //System.err.println("NOT PROMOTED");
+                return e ;
+            }
+        } ;
+
+        Future<TDBTransactionException> attemptedPromoteFuture = executor.submit(attemptedPromote) ;
+        // Advance "attempted promote" to (*2), inside a read transaction, before attempting a promoting write.
+        // The transaction has been created and started.
+        semaPromoteTxnStart.acquireUninterruptibly();
+        
+        // Advance "attempted promote" allowing it to go (*3) where it blocks
+        // This may happen at any time - as soon as it does, the "attempted promote" blocks.
+        semaPromoteTxnContinue.release(1);
+        // I don't know of a better way to ensure "attempted promote" is blocked. 
+        
+        Lib.sleep(100) ;
+        // Let the active writer go.
+        semaActiveWriterContinue.release(1);
+        
+        try { 
+            // Collect the active writer.
+            activeWriterFuture.get();
+            
+            // (Ideal) and the attempted promotion should advance if the active writer aborts.
+            TDBTransactionException e = attemptedPromoteFuture.get() ;
+            if ( e != null )
+                throw e ;
+        } catch (InterruptedException | ExecutionException e1) { throw new RuntimeException(e1) ; }
+    }
 }