You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2018/01/04 20:46:27 UTC
[13/14] jena git commit: JENA-1458: Update ThreadTxn and associated
tests
JENA-1458: Update ThreadTxn and associated tests
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/ddfb3e45
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/ddfb3e45
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/ddfb3e45
Branch: refs/heads/master
Commit: ddfb3e453d04467e4f04286c72befc4d0ffb3f1a
Parents: 87ab2a7
Author: Andy Seaborne <an...@apache.org>
Authored: Wed Jan 3 17:10:06 2018 +0000
Committer: Andy Seaborne <an...@apache.org>
Committed: Wed Jan 3 17:10:06 2018 +0000
----------------------------------------------------------------------
.../java/org/apache/jena/system/ThreadTxn.java | 35 +++--
.../main/java/org/apache/jena/system/Txn.java | 4 +-
.../java/org/apache/jena/system/TxnCounter.java | 13 +-
.../java/org/apache/jena/system/TestTxn.java | 139 +++++++++++++++++--
.../jena/dboe/transaction/TransInteger.java | 6 +
.../jena/dboe/transaction/TestTxnLib.java | 8 +-
6 files changed, 169 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/ddfb3e45/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java b/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java
index 0b73821..93aa9bf 100644
--- a/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java
+++ b/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java
@@ -18,7 +18,7 @@
package org.apache.jena.system;
-import org.apache.jena.query.ReadWrite ;
+import org.apache.jena.query.TxnType;
import org.apache.jena.sparql.core.Transactional ;
/**
@@ -33,11 +33,19 @@ import org.apache.jena.sparql.core.Transactional ;
*/
public class ThreadTxn {
+ /** Create a thread-backed delayed transaction action.
+ * Call {@link ThreadAction#run} to perform the read transaction.
+ */
+ public static ThreadAction threadTxn(Transactional trans, TxnType txnType, Runnable action) {
+ return create(trans, txnType, action, true, true) ;
+ }
+
+
/** Create a thread-backed delayed READ transaction action.
* Call {@link ThreadAction#run} to perform the read transaction.
*/
public static ThreadAction threadTxnRead(Transactional trans, Runnable action) {
- return create(trans, ReadWrite.READ, action, false) ;
+ return threadTxn(trans, TxnType.READ, action) ;
}
/** Create a thread-backed delayed WRITE action.
@@ -46,30 +54,33 @@ public class ThreadTxn {
* this will deadlock.)
*/
public static ThreadAction threadTxnWrite(Transactional trans, Runnable action) {
- return create(trans, ReadWrite.WRITE, action, true) ;
+ return threadTxn(trans, TxnType.WRITE, action) ;
}
/** Create a thread-backed delayed WRITE-abort action (mainly for testing). */
public static ThreadAction threadTxnWriteAbort(Transactional trans, Runnable action) {
- return create(trans, ReadWrite.WRITE, action, false) ;
+ return create(trans, TxnType.WRITE, action, true, false) ;
}
- /*package*/ static ThreadAction create(Transactional trans, ReadWrite mode, Runnable action, boolean isCommit) {
+ private static ThreadAction create(Transactional trans, TxnType txnType, Runnable action, boolean isCommitBefore, boolean isCommitAfter) {
return ThreadAction.create
- ( beforeAction(trans, mode, isCommit)
+ ( beforeAction(trans, txnType, isCommitBefore)
, action
- , afterAction(trans, mode, isCommit) ) ;
+ , afterAction(trans, txnType, isCommitAfter) ) ;
}
- private static Runnable beforeAction(Transactional trans, ReadWrite mode, boolean isCommit) {
- return ()-> trans.begin(mode) ;
+ private static Runnable beforeAction(Transactional trans, TxnType txnType, boolean isCommit) {
+ return ()-> trans.begin(txnType) ;
}
- private static Runnable afterAction(Transactional trans, ReadWrite mode, boolean isCommit) {
+ private static Runnable afterAction(Transactional trans, TxnType txnType, boolean isCommit) {
return () -> {
// Finish transaction (if no throwable)
- switch (mode) {
- case WRITE : {
+ switch (txnType) {
+ case WRITE :
+ case READ_COMMITTED_PROMOTE :
+ case READ_PROMOTE :
+ {
if ( isCommit )
trans.commit() ;
else
http://git-wip-us.apache.org/repos/asf/jena/blob/ddfb3e45/jena-arq/src/main/java/org/apache/jena/system/Txn.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/system/Txn.java b/jena-arq/src/main/java/org/apache/jena/system/Txn.java
index a0ad64f..5a92a04 100644
--- a/jena-arq/src/main/java/org/apache/jena/system/Txn.java
+++ b/jena-arq/src/main/java/org/apache/jena/system/Txn.java
@@ -46,7 +46,7 @@ public class Txn {
* change from "read" to "write"; the {@link Transactional#promote promote} method
* returns a boolean indicating whether the promotion was possible or not.
*/
- public static <T extends Transactional> void exec(T txn, Runnable r) {
+ public static <T extends Transactional> void execute(T txn, Runnable r) {
exec(txn, TxnType.READ_PROMOTE, r);
}
@@ -64,7 +64,7 @@ public class Txn {
* change from "read" to "write"; the {@link Transactional#promote promote} method
* returns a boolean indicating whether the promotion was possible or not.
*/
- public static <T extends Transactional, X> X calc(T txn, Supplier<X> r) {
+ public static <T extends Transactional, X> X calculate(T txn, Supplier<X> r) {
return calc(txn, TxnType.READ_PROMOTE, r);
}
http://git-wip-us.apache.org/repos/asf/jena/blob/ddfb3e45/jena-arq/src/main/java/org/apache/jena/system/TxnCounter.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/system/TxnCounter.java b/jena-arq/src/main/java/org/apache/jena/system/TxnCounter.java
index e641d55..701e95a 100644
--- a/jena-arq/src/main/java/org/apache/jena/system/TxnCounter.java
+++ b/jena-arq/src/main/java/org/apache/jena/system/TxnCounter.java
@@ -73,7 +73,7 @@ public class TxnCounter implements Transactional {
// The kind of transaction.
private ThreadLocal<ReadWrite> transactionMode = ThreadLocal.withInitial(()->null);
private ThreadLocal<TxnType> transactionType = ThreadLocal.withInitial(()->null);
- private ThreadLocal<Long> transactionEpoch = ThreadLocal.withInitial(()->null);
+ private ThreadLocal<Long> transactionEpoch = ThreadLocal.withInitial(()->null);
// Synchronization for making changes.
private Object txnLifecycleLock = new Object();
@@ -108,8 +108,8 @@ public class TxnCounter implements Transactional {
synchronized(txnLifecycleLock) {
if ( transactionMode.get() != null )
throw new JenaTransactionException("Already in a transaction");
- long thisEpoch = epoch.incrementAndGet();
- transactionEpoch.set(thisEpoch) ;
+ // Set transaction to current epoch - writes advance this in commit().
+ transactionEpoch.set(epoch.get()) ;
IntegerState state = new IntegerState(value.get());
transactionValue.set(state);
transactionMode.set(TxnType.initial(txnType));
@@ -132,7 +132,7 @@ public class TxnCounter implements Transactional {
transactionValue.set(state);
return true;
}
- // READ no committed.
+ // READ_PROMOTE
acquireWriterLock(true);
synchronized(txnLifecycleLock) {
long nowEpoch = epoch.get();
@@ -146,13 +146,14 @@ public class TxnCounter implements Transactional {
}
return true;
}
-
@Override
public void commit() {
checkTxn();
if ( isWriteTxn() ) {
- // Set global.
+ // Theer is only one writer - we are inside the writer lock.
+ // Advance the epoch.
+ long thisEpoch = epoch.incrementAndGet();
value.set(getDataState().txnValue);
transactionValue.set(null);
releaseWriterLock();
http://git-wip-us.apache.org/repos/asf/jena/blob/ddfb3e45/jena-arq/src/test/java/org/apache/jena/system/TestTxn.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/system/TestTxn.java b/jena-arq/src/test/java/org/apache/jena/system/TestTxn.java
index 2ef2718..668efb5 100644
--- a/jena-arq/src/test/java/org/apache/jena/system/TestTxn.java
+++ b/jena-arq/src/test/java/org/apache/jena/system/TestTxn.java
@@ -18,10 +18,15 @@
package org.apache.jena.system;
-import static org.junit.Assert.assertEquals ;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
-import org.apache.jena.system.Txn ;
-import org.apache.jena.system.TxnCounter ;
+import org.apache.jena.query.ReadWrite;
+import org.apache.jena.query.TxnType;
+import org.apache.jena.sparql.JenaTransactionException;
+import org.apache.jena.sparql.core.DatasetGraphFactory;
+import org.apache.jena.sparql.core.Transactional;
import org.junit.Test ;
public class TestTxn {
@@ -93,7 +98,7 @@ public class TestTxn {
}) ;
assertEquals("Outside W",1, x) ;
}
-
+
@Test public void txn_write_03() {
Txn.executeWrite(counter, () -> {
counter.inc() ;
@@ -131,7 +136,7 @@ public class TestTxn {
assertEquals("In R, get()", 1, counter.get()) ;
}) ;
}
-
+
@Test public void txn_rw_2() {
Txn.executeRead(counter, () -> {
assertEquals("In R, value()", 0, counter.value()) ;
@@ -152,7 +157,7 @@ public class TestTxn {
assertEquals("In R, get()", 1, counter.get()) ;
}) ;
}
-
+
@Test public void txn_continue_1() {
Txn.executeWrite(counter, ()->counter.set(91)) ;
@@ -167,7 +172,7 @@ public class TestTxn {
});
assertEquals(92,counter.value()) ;
}
-
+
@Test public void txn_continue_2() {
Txn.executeWrite(counter, ()->counter.set(91)) ;
@@ -186,7 +191,7 @@ public class TestTxn {
});
assertEquals(94,counter.value()) ;
}
-
+
@Test(expected=ExceptionFromTest.class)
public void txn_exception_01() {
Txn.executeWrite(counter, counter::inc) ;
@@ -198,7 +203,7 @@ public class TestTxn {
throw new ExceptionFromTest() ;
}) ;
}
-
+
@Test
public void txn_exception_02() {
Txn.executeWrite(counter, ()->counter.set(8)) ;
@@ -218,7 +223,7 @@ public class TestTxn {
@Test
public void txn_exception_03() {
Txn.executeWrite(counter, ()->counter.set(9)) ;
-
+
try {
Txn.executeRead(counter, () -> {
assertEquals("In W, value()", 9, counter.value());
@@ -229,9 +234,121 @@ public class TestTxn {
catch (ExceptionFromTest ex) {}
assertEquals("After W/abort, get()", 9, counter.get());
}
+
+ @Test
+ public void txn_nested_01() {
+ Txn.exec(counter, TxnType.READ, ()->{
+ Txn.exec(counter, TxnType.READ, ()->{});
+ });
+ }
+
+ @Test(expected=JenaTransactionException.class)
+ public void txn_nested_02() {
+ Txn.exec(counter, TxnType.READ, ()->{
+ Txn.exec(counter, TxnType.WRITE, ()->{});
+ });
+ }
+
+ @Test(expected=JenaTransactionException.class)
+ public void txn_nested_03() {
+ Txn.exec(counter, TxnType.WRITE, ()->{
+ // Must the same type to nest Txn.
+ Txn.exec(counter, TxnType.READ, ()->{});
+ });
+ }
+
+ @Test
+ public void txn_nested_04() {
+ Txn.exec(counter, TxnType.READ_PROMOTE, ()->{
+ boolean b = counter.promote();
+ assertTrue(b);
+ // Must the same type to nest Txn.
+ Txn.exec(counter, TxnType.READ_PROMOTE, ()->{});
+ });
+ }
+
+ @Test
+ public void txn_nested_05() {
+ Txn.exec(counter, TxnType.READ_PROMOTE, ()->{
+ boolean b = counter.promote();
+ assertTrue(b);
+ assertEquals(ReadWrite.WRITE, counter.transactionMode());
+ // Must the same type to nest Txn.
+ Txn.exec(counter, TxnType.READ_PROMOTE, ()->{});
+ });
+ }
+
+ @Test(expected=JenaTransactionException.class)
+ public void txn_nested_06() {
+ Txn.exec(counter, TxnType.READ_PROMOTE, ()->{
+ boolean b = counter.promote();
+ assertTrue(b);
+ assertEquals(ReadWrite.WRITE, counter.transactionMode());
+ // Must the same type to nest Txn.
+ Txn.exec(counter, TxnType.WRITE, ()->{});
+ });
+ }
+
+ @Test
+ public void txn_threaded_01() {
+ Txn.exec(counter, TxnType.READ_PROMOTE, ()->{
+ ThreadAction a = ThreadTxn.threadTxnWrite(counter, ()->{});
+ a.run();
+ // Blocks promotion.
+ boolean b = counter.promote();
+ assertFalse(b);
+ assertEquals(ReadWrite.READ, counter.transactionMode());
+ });
+ }
+
+// // This would lock up.
+// public void txn_threaded_Not_A_Test() {
+// Txn.exec(counter, TxnType.READ_PROMOTE, ()->{
+// ThreadAction a = ThreadTxn.threadTxnWrite(counter, ()->{});
+// // a is in a W transaction but has not committed or aborted - it's paused.
+// boolean b = counter.promote();
+// // Never reach here.
+// a.run();
+// });
+// }
+
+ @Test
+ public void txn_threaded_02() {
+ //Transactional tx = DatasetGraphFactory.createTxnMem();
+ Transactional tx = counter;
+
+ // Start and enter the W transaction.
+ ThreadAction a = ThreadTxn.threadTxnWrite(tx, ()->{});
+
+ // ThreadAction started ... in W transaction.
+ Txn.exec(tx, TxnType.READ_PROMOTE, ()->{
+ // ... have the thread action complete.
+ a.run();
+ // Blocks promotion.
+ boolean b = tx.promote();
+ assertFalse(b);
+ assertEquals(ReadWrite.READ, tx.transactionMode());
+ });
+ }
- // Tests for thread transactions.
+ @Test
+ public void txn_threaded_03() {
+ Transactional tx = DatasetGraphFactory.createTxnMem();
+ //Transactional tx = counter;
+
+ // Start and enter the W transaction.
+ ThreadAction a = ThreadTxn.threadTxnWriteAbort(tx, ()->{});
+ // ThreadAction started ... in W transaction.
+ Txn.exec(tx, TxnType.READ_PROMOTE, ()->{
+ // ... have the thread action abort..
+ a.run();
+ // Does not block promotion.
+ boolean b = tx.promote();
+ assertTrue(b);
+ assertEquals(ReadWrite.WRITE, tx.transactionMode());
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/jena/blob/ddfb3e45/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransInteger.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransInteger.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransInteger.java
index 5ef6791..3a44301 100644
--- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransInteger.java
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransInteger.java
@@ -207,6 +207,8 @@ public class TransInteger extends TransactionalComponentLifecycle<TransInteger.I
@Override
protected ByteBuffer _commitPrepare(TxnId txnId, IntegerState state) {
+ if ( isReadTxn() )
+ return null;
ByteBuffer x = ByteBuffer.allocate(Long.BYTES) ;
x.putLong(state.txnValue) ;
return x ;
@@ -214,11 +216,15 @@ public class TransInteger extends TransactionalComponentLifecycle<TransInteger.I
@Override
protected void _commit(TxnId txnId, IntegerState state) {
+ if ( isReadTxn() )
+ return ;
writeLocation(state.txnValue) ;
}
@Override
protected void _commitEnd(TxnId txnId, IntegerState state) {
+ if ( isReadTxn() )
+ return ;
value.set(state.txnValue) ;
}
@Override
http://git-wip-us.apache.org/repos/asf/jena/blob/ddfb3e45/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnLib.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnLib.java b/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnLib.java
index a4e39dc..219f77a 100644
--- a/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnLib.java
+++ b/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnLib.java
@@ -28,6 +28,7 @@ import org.apache.jena.system.Txn;
import org.apache.jena.query.ReadWrite ;
import org.junit.Test ;
+/** Txn with DBOE transactions */
public class TestTxnLib extends AbstractTestTxn {
@Test public void libTxn_1() {
@@ -184,7 +185,7 @@ public class TestTxnLib extends AbstractTestTxn {
}
@Test public void libTxnThread_12() {
- long x1 = counter1.get() ;
+ long x1 = counter1.get() ;
ThreadAction t = ThreadTxn.threadTxnRead(unit, () -> {
long z1 = counter1.get() ;
assertEquals("Thread", x1, z1) ;
@@ -192,9 +193,6 @@ public class TestTxnLib extends AbstractTestTxn {
Txn.executeWrite(unit, ()->counter1.inc()) ;
t.run() ;
long x2 = counter1.get() ;
- assertEquals("after", x1+1, x2) ;
+ assertEquals("after::", x1+1, x2) ;
}
-
}
-
-
\ No newline at end of file