You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2018/01/04 20:46:27 UTC

[13/14] jena git commit: JENA-1458: Update ThreadTxn and associated tests

JENA-1458: Update ThreadTxn and associated tests


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/ddfb3e45
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/ddfb3e45
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/ddfb3e45

Branch: refs/heads/master
Commit: ddfb3e453d04467e4f04286c72befc4d0ffb3f1a
Parents: 87ab2a7
Author: Andy Seaborne <an...@apache.org>
Authored: Wed Jan 3 17:10:06 2018 +0000
Committer: Andy Seaborne <an...@apache.org>
Committed: Wed Jan 3 17:10:06 2018 +0000

----------------------------------------------------------------------
 .../java/org/apache/jena/system/ThreadTxn.java  |  35 +++--
 .../main/java/org/apache/jena/system/Txn.java   |   4 +-
 .../java/org/apache/jena/system/TxnCounter.java |  13 +-
 .../java/org/apache/jena/system/TestTxn.java    | 139 +++++++++++++++++--
 .../jena/dboe/transaction/TransInteger.java     |   6 +
 .../jena/dboe/transaction/TestTxnLib.java       |   8 +-
 6 files changed, 169 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/ddfb3e45/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java b/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java
index 0b73821..93aa9bf 100644
--- a/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java
+++ b/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java
@@ -18,7 +18,7 @@
 
 package org.apache.jena.system;
 
-import org.apache.jena.query.ReadWrite ;
+import org.apache.jena.query.TxnType;
 import org.apache.jena.sparql.core.Transactional ;
 
 /**
@@ -33,11 +33,19 @@ import org.apache.jena.sparql.core.Transactional ;
  */ 
 public class ThreadTxn {
 
+    /** Create a thread-backed delayed transaction action. 
+     * Call {@link ThreadAction#run} to perform the read transaction.
+     */
+    public static ThreadAction threadTxn(Transactional trans, TxnType txnType, Runnable action) {
+        return create(trans, txnType, action, true, true) ;
+    }
+
+
     /** Create a thread-backed delayed READ transaction action. 
      * Call {@link ThreadAction#run} to perform the read transaction.
      */
     public static ThreadAction threadTxnRead(Transactional trans, Runnable action) {
-        return create(trans, ReadWrite.READ, action, false) ;
+        return threadTxn(trans, TxnType.READ, action) ;
     }
 
     /** Create a thread-backed delayed WRITE action.
@@ -46,30 +54,33 @@ public class ThreadTxn {
      * this will deadlock.)
      */
     public static ThreadAction threadTxnWrite(Transactional trans, Runnable action) {
-        return create(trans, ReadWrite.WRITE, action, true) ;
+        return threadTxn(trans, TxnType.WRITE, action) ;
     }
    
     /** Create a thread-backed delayed WRITE-abort action (mainly for testing). */
     public static ThreadAction threadTxnWriteAbort(Transactional trans, Runnable action) {
-        return create(trans, ReadWrite.WRITE, action, false) ;
+        return create(trans, TxnType.WRITE, action, true, false) ;
     }
 
-    /*package*/ static ThreadAction create(Transactional trans, ReadWrite mode, Runnable action, boolean isCommit) {
+    private static ThreadAction create(Transactional trans, TxnType txnType, Runnable action, boolean isCommitBefore,  boolean isCommitAfter) {
         return ThreadAction.create
-            ( beforeAction(trans, mode, isCommit)
+            ( beforeAction(trans, txnType, isCommitBefore)
             , action
-            , afterAction(trans, mode, isCommit) ) ;
+            , afterAction(trans, txnType, isCommitAfter) ) ;
     }
     
-    private static Runnable beforeAction(Transactional trans, ReadWrite mode, boolean isCommit) {
-        return ()-> trans.begin(mode) ;
+    private static Runnable beforeAction(Transactional trans, TxnType txnType, boolean isCommit) {
+        return ()-> trans.begin(txnType) ;
     }
     
-    private static Runnable afterAction(Transactional trans, ReadWrite mode, boolean isCommit) {
+    private static Runnable afterAction(Transactional trans, TxnType txnType, boolean isCommit) {
         return () -> {
             // Finish transaction (if no throwable)
-            switch (mode) {
-                case WRITE : {
+            switch (txnType) {
+                case WRITE :
+                case READ_COMMITTED_PROMOTE :
+                case READ_PROMOTE :
+                {
                     if ( isCommit )
                         trans.commit() ;
                     else

http://git-wip-us.apache.org/repos/asf/jena/blob/ddfb3e45/jena-arq/src/main/java/org/apache/jena/system/Txn.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/system/Txn.java b/jena-arq/src/main/java/org/apache/jena/system/Txn.java
index a0ad64f..5a92a04 100644
--- a/jena-arq/src/main/java/org/apache/jena/system/Txn.java
+++ b/jena-arq/src/main/java/org/apache/jena/system/Txn.java
@@ -46,7 +46,7 @@ public class Txn {
      * change from "read" to "write"; the {@link Transactional#promote promote} method
      * returns a boolean indicating whether the promotion was possible or not. 
      */
-    public static <T extends Transactional> void exec(T txn, Runnable r) {
+    public static <T extends Transactional> void execute(T txn, Runnable r) {
         exec(txn, TxnType.READ_PROMOTE, r);
     }
 
@@ -64,7 +64,7 @@ public class Txn {
      * change from "read" to "write"; the {@link Transactional#promote promote} method
      * returns a boolean indicating whether the promotion was possible or not. 
      */
-    public static <T extends Transactional, X> X calc(T txn, Supplier<X> r) {
+    public static <T extends Transactional, X> X calculate(T txn, Supplier<X> r) {
         return calc(txn, TxnType.READ_PROMOTE, r);
     }
 

http://git-wip-us.apache.org/repos/asf/jena/blob/ddfb3e45/jena-arq/src/main/java/org/apache/jena/system/TxnCounter.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/system/TxnCounter.java b/jena-arq/src/main/java/org/apache/jena/system/TxnCounter.java
index e641d55..701e95a 100644
--- a/jena-arq/src/main/java/org/apache/jena/system/TxnCounter.java
+++ b/jena-arq/src/main/java/org/apache/jena/system/TxnCounter.java
@@ -73,7 +73,7 @@ public class TxnCounter implements Transactional {
     // The kind of transaction.
     private ThreadLocal<ReadWrite>    transactionMode  = ThreadLocal.withInitial(()->null);
     private ThreadLocal<TxnType>      transactionType  = ThreadLocal.withInitial(()->null);
-    private ThreadLocal<Long>         transactionEpoch         = ThreadLocal.withInitial(()->null);
+    private ThreadLocal<Long>         transactionEpoch = ThreadLocal.withInitial(()->null);
     
     // Synchronization for making changes.  
     private Object txnLifecycleLock   = new Object(); 
@@ -108,8 +108,8 @@ public class TxnCounter implements Transactional {
         synchronized(txnLifecycleLock) {
             if ( transactionMode.get() != null )
                 throw new JenaTransactionException("Already in a transaction");
-            long thisEpoch = epoch.incrementAndGet(); 
-            transactionEpoch.set(thisEpoch) ;
+            // Set transaction to current epoch - writes advance this in commit().
+            transactionEpoch.set(epoch.get()) ;
             IntegerState state = new IntegerState(value.get());
             transactionValue.set(state);
             transactionMode.set(TxnType.initial(txnType));
@@ -132,7 +132,7 @@ public class TxnCounter implements Transactional {
             transactionValue.set(state);
             return true;
         }
-        // READ no committed.
+        // READ_PROMOTE
         acquireWriterLock(true);
         synchronized(txnLifecycleLock) {
             long nowEpoch = epoch.get();
@@ -146,13 +146,14 @@ public class TxnCounter implements Transactional {
         }
         return true;
     }
-
     
     @Override
     public void commit() {
         checkTxn(); 
         if ( isWriteTxn() ) {
-            // Set global.
+            // Theer is only one writer - we are inside the writer lock. 
+            // Advance the epoch.
+            long thisEpoch = epoch.incrementAndGet();
             value.set(getDataState().txnValue);
             transactionValue.set(null);
             releaseWriterLock();

http://git-wip-us.apache.org/repos/asf/jena/blob/ddfb3e45/jena-arq/src/test/java/org/apache/jena/system/TestTxn.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/system/TestTxn.java b/jena-arq/src/test/java/org/apache/jena/system/TestTxn.java
index 2ef2718..668efb5 100644
--- a/jena-arq/src/test/java/org/apache/jena/system/TestTxn.java
+++ b/jena-arq/src/test/java/org/apache/jena/system/TestTxn.java
@@ -18,10 +18,15 @@
 
 package org.apache.jena.system;
 
-import static org.junit.Assert.assertEquals ;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
-import org.apache.jena.system.Txn ;
-import org.apache.jena.system.TxnCounter ;
+import org.apache.jena.query.ReadWrite;
+import org.apache.jena.query.TxnType;
+import org.apache.jena.sparql.JenaTransactionException;
+import org.apache.jena.sparql.core.DatasetGraphFactory;
+import org.apache.jena.sparql.core.Transactional;
 import org.junit.Test ;
 
 public class TestTxn {
@@ -93,7 +98,7 @@ public class TestTxn {
             }) ;
         assertEquals("Outside W",1, x) ;
     }
-    
+
     @Test public void txn_write_03() {
         Txn.executeWrite(counter, () -> {
             counter.inc() ;
@@ -131,7 +136,7 @@ public class TestTxn {
             assertEquals("In R, get()", 1, counter.get()) ;
         }) ;
     }
-    
+
     @Test public void txn_rw_2() {
         Txn.executeRead(counter, () -> {
             assertEquals("In R, value()", 0, counter.value()) ;
@@ -152,7 +157,7 @@ public class TestTxn {
             assertEquals("In R, get()", 1, counter.get()) ;
         }) ;
     }
-    
+
     @Test public void txn_continue_1() {
         Txn.executeWrite(counter, ()->counter.set(91)) ;
         
@@ -167,7 +172,7 @@ public class TestTxn {
             });
         assertEquals(92,counter.value()) ;
     }
-    
+
     @Test public void txn_continue_2() {
         Txn.executeWrite(counter, ()->counter.set(91)) ;
         
@@ -186,7 +191,7 @@ public class TestTxn {
             });
         assertEquals(94,counter.value()) ;
     }
-    
+
     @Test(expected=ExceptionFromTest.class)
     public void txn_exception_01() {
         Txn.executeWrite(counter, counter::inc) ;
@@ -198,7 +203,7 @@ public class TestTxn {
             throw new ExceptionFromTest() ;
         }) ;
     }
-    
+
     @Test
     public void txn_exception_02() {
         Txn.executeWrite(counter, ()->counter.set(8)) ;
@@ -218,7 +223,7 @@ public class TestTxn {
     @Test
     public void txn_exception_03() {
         Txn.executeWrite(counter, ()->counter.set(9)) ;
-    
+
         try {
             Txn.executeRead(counter, () -> {
                 assertEquals("In W, value()", 9, counter.value());
@@ -229,9 +234,121 @@ public class TestTxn {
         catch (ExceptionFromTest ex) {}
         assertEquals("After W/abort, get()", 9, counter.get());
     }
+
+    @Test
+    public void txn_nested_01() {
+        Txn.exec(counter, TxnType.READ, ()->{
+            Txn.exec(counter, TxnType.READ, ()->{});
+        });
+    }
+
+    @Test(expected=JenaTransactionException.class)
+    public void txn_nested_02() {
+        Txn.exec(counter, TxnType.READ, ()->{
+            Txn.exec(counter, TxnType.WRITE, ()->{});
+        });
+    }
+
+    @Test(expected=JenaTransactionException.class)
+    public void txn_nested_03() {
+        Txn.exec(counter, TxnType.WRITE, ()->{
+            // Must the same type to nest Txn.
+            Txn.exec(counter, TxnType.READ, ()->{});
+        });
+    }
+
+    @Test
+    public void txn_nested_04() {
+        Txn.exec(counter, TxnType.READ_PROMOTE, ()->{
+            boolean b = counter.promote();
+            assertTrue(b);
+            // Must the same type to nest Txn.
+            Txn.exec(counter, TxnType.READ_PROMOTE, ()->{});
+        });
+    }
+
+    @Test
+    public void txn_nested_05() {
+        Txn.exec(counter, TxnType.READ_PROMOTE, ()->{
+            boolean b = counter.promote();
+            assertTrue(b);
+            assertEquals(ReadWrite.WRITE, counter.transactionMode());
+            // Must the same type to nest Txn.
+            Txn.exec(counter, TxnType.READ_PROMOTE, ()->{});
+        });
+    }
+
+    @Test(expected=JenaTransactionException.class)
+    public void txn_nested_06() {
+        Txn.exec(counter, TxnType.READ_PROMOTE, ()->{
+            boolean b = counter.promote();
+            assertTrue(b);
+            assertEquals(ReadWrite.WRITE, counter.transactionMode());
+            // Must the same type to nest Txn.
+            Txn.exec(counter, TxnType.WRITE, ()->{});
+        });
+    }
+
+    @Test
+    public void txn_threaded_01() {
+        Txn.exec(counter, TxnType.READ_PROMOTE, ()->{
+            ThreadAction a = ThreadTxn.threadTxnWrite(counter, ()->{});
+            a.run();
+            // Blocks promotion.
+            boolean b = counter.promote();
+            assertFalse(b);
+            assertEquals(ReadWrite.READ, counter.transactionMode());
+        });
+    }
+
+//    // This would lock up.
+//    public void txn_threaded_Not_A_Test() {
+//        Txn.exec(counter, TxnType.READ_PROMOTE, ()->{
+//            ThreadAction a = ThreadTxn.threadTxnWrite(counter, ()->{});
+//            // a is in a W transaction but has not committed or aborted - it's paused.
+//            boolean b = counter.promote();
+//            // Never reach here.
+//            a.run();
+//        });
+//    }
+
+    @Test
+    public void txn_threaded_02() {
+        //Transactional tx = DatasetGraphFactory.createTxnMem();
+        Transactional tx = counter; 
+        
+        // Start and enter the W transaction.
+        ThreadAction a = ThreadTxn.threadTxnWrite(tx, ()->{});
+
+        // ThreadAction started ... in W transaction.
+        Txn.exec(tx, TxnType.READ_PROMOTE, ()->{
+            // ... have the thread action complete.
+            a.run(); 
+            // Blocks promotion.
+            boolean b = tx.promote();
+            assertFalse(b);
+            assertEquals(ReadWrite.READ, tx.transactionMode());
+        });
+    }
     
-    // Tests for thread transactions.
+    @Test
+    public void txn_threaded_03() {
+        Transactional tx = DatasetGraphFactory.createTxnMem();
+        //Transactional tx = counter; 
+        
+        // Start and enter the W transaction.
+        ThreadAction a = ThreadTxn.threadTxnWriteAbort(tx, ()->{});
 
+        // ThreadAction started ... in W transaction.
+        Txn.exec(tx, TxnType.READ_PROMOTE, ()->{
+            // ... have the thread action abort..
+            a.run(); 
+            // Does not block promotion.
+            boolean b = tx.promote();
+            assertTrue(b);
+            assertEquals(ReadWrite.WRITE, tx.transactionMode());
+        });
+    }
 
 }
 

http://git-wip-us.apache.org/repos/asf/jena/blob/ddfb3e45/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransInteger.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransInteger.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransInteger.java
index 5ef6791..3a44301 100644
--- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransInteger.java
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransInteger.java
@@ -207,6 +207,8 @@ public class TransInteger extends TransactionalComponentLifecycle<TransInteger.I
 
     @Override
     protected ByteBuffer _commitPrepare(TxnId txnId, IntegerState state) {
+        if ( isReadTxn() )
+            return null;
         ByteBuffer x = ByteBuffer.allocate(Long.BYTES) ;
         x.putLong(state.txnValue) ;
         return x ;
@@ -214,11 +216,15 @@ public class TransInteger extends TransactionalComponentLifecycle<TransInteger.I
 
     @Override
     protected void _commit(TxnId txnId, IntegerState state) {
+        if ( isReadTxn() )
+            return ;
         writeLocation(state.txnValue) ;
     }
 
     @Override
     protected void _commitEnd(TxnId txnId, IntegerState state) {
+        if ( isReadTxn() )
+            return ;
         value.set(state.txnValue) ;
     }
     @Override

http://git-wip-us.apache.org/repos/asf/jena/blob/ddfb3e45/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnLib.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnLib.java b/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnLib.java
index a4e39dc..219f77a 100644
--- a/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnLib.java
+++ b/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnLib.java
@@ -28,6 +28,7 @@ import org.apache.jena.system.Txn;
 import org.apache.jena.query.ReadWrite ;
 import org.junit.Test ;
 
+/** Txn with DBOE transactions */ 
 public class TestTxnLib extends AbstractTestTxn {
 
     @Test public void libTxn_1() {
@@ -184,7 +185,7 @@ public class TestTxnLib extends AbstractTestTxn {
     }
 
     @Test public void libTxnThread_12() {
-        long x1 = counter1.get() ;  
+        long x1 = counter1.get() ; 
         ThreadAction t = ThreadTxn.threadTxnRead(unit, () -> {
             long z1 = counter1.get() ;
             assertEquals("Thread", x1, z1) ;
@@ -192,9 +193,6 @@ public class TestTxnLib extends AbstractTestTxn {
         Txn.executeWrite(unit, ()->counter1.inc()) ;
         t.run() ;
         long x2 = counter1.get() ;
-        assertEquals("after", x1+1, x2) ;
+        assertEquals("after::", x1+1, x2) ;
     }
-
 }
-
- 
\ No newline at end of file