You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2016/08/17 16:01:42 UTC
[09/11] jena git commit: Tests for active writer when promote happens
Tests for active writer when promote happens
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/f972dde6
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/f972dde6
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/f972dde6
Branch: refs/heads/master
Commit: f972dde69beccefad2e01cf97cc7708917a309ab
Parents: f0d389d
Author: Andy Seaborne <an...@apache.org>
Authored: Sun Aug 14 20:54:52 2016 +0100
Committer: Andy Seaborne <an...@apache.org>
Committed: Sun Aug 14 20:54:52 2016 +0100
----------------------------------------------------------------------
.../jena/tdb/transaction/TestTransPromote.java | 99 +++++++++++++++++++-
1 file changed, 96 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/f972dde6/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
----------------------------------------------------------------------
diff --git a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
index 2ae1ad1..43bc02a 100644
--- a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
+++ b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
@@ -21,10 +21,11 @@ package org.apache.jena.tdb.transaction ;
import static org.junit.Assert.assertEquals ;
import static org.junit.Assert.fail ;
-import java.util.concurrent.Semaphore ;
+import java.util.concurrent.* ;
import java.util.concurrent.atomic.AtomicInteger ;
import org.apache.jena.atlas.iterator.Iter ;
+import org.apache.jena.atlas.lib.Lib ;
import org.apache.jena.query.ReadWrite ;
import org.apache.jena.sparql.core.DatasetGraph ;
import org.apache.jena.sparql.core.Quad ;
@@ -301,9 +302,10 @@ public class TestTransPromote {
// Test whether an active writer causes a snapshot isolation
// promotion to fail like it should.
- // No equivalent read commiter version - it would block.
+ // The promtion is on the test's thread.
+ // No equivalent "read commited" version - it would block.
@Test(expected=TDBTransactionException.class)
- public void promote_clash_1() {
+ public void promote_active_writer_0() {
DatasetGraphTransaction.readCommittedPromotion = false ;
DatasetGraph dsg = create() ;
Semaphore sema1 = new Semaphore(0) ;
@@ -326,7 +328,98 @@ public class TestTransPromote {
// If snapshot, this will cause an exception due to the active writer.
dsg.add(q1) ;
fail("Should not be here") ;
+ sema2.release(1);
dsg.commit() ;
dsg.end() ;
}
+
+ @Test(expected=TDBTransactionException.class)
+ public void promote_active_writer_1() throws InterruptedException, ExecutionException {
+ // Active writer commits -> no promotion.
+ promote_active_writer(true) ;
+ }
+
+ // Current implementation in TDB - the active writer causes "no promotion" even though it will abort.
+ @Test(expected=TDBTransactionException.class)
+ public void promote_active_writer_2() throws InterruptedException, ExecutionException {
+ // Active writer aborts -> promotion possible (but not ipleemtned that way).
+ promote_active_writer(false) ;
+ }
+ private void promote_active_writer(boolean activeWriterCommit) {
+ ExecutorService executor = Executors.newFixedThreadPool(2) ;
+ try {
+ promote_clash_active_writer(executor, false) ;
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+ private void promote_clash_active_writer(ExecutorService executor, boolean activeWriterCommit) {
+ DatasetGraphTransaction.readCommittedPromotion = false ;
+ Semaphore semaActiveWriterStart = new Semaphore(0) ;
+ Semaphore semaActiveWriterContinue = new Semaphore(0) ;
+ Semaphore semaPromoteTxnStart = new Semaphore(0) ;
+ Semaphore semaPromoteTxnContinue = new Semaphore(0) ;
+
+ DatasetGraph dsg = create() ;
+
+ // The "active writer".
+ Callable<Object> activeWriter = ()->{
+ dsg.begin(ReadWrite.WRITE) ;
+ semaActiveWriterStart.release(1) ;
+ // (*1)
+ semaActiveWriterContinue.acquireUninterruptibly(1) ;
+ if ( activeWriterCommit )
+ dsg.commit() ;
+ else
+ dsg.abort();
+ dsg.end() ;
+ return null ;
+ } ;
+
+ Future<Object> activeWriterFuture = executor.submit(activeWriter) ;
+ // Advance "active writer" to (*1), inside a write transaction and waiting.
+ // The transaction has been created and started.
+ semaActiveWriterStart.acquireUninterruptibly();
+
+ Callable<TDBTransactionException> attemptedPromote = ()->{
+ dsg.begin(ReadWrite.READ) ;
+ semaPromoteTxnStart.release(1) ;
+ // (*2)
+ semaPromoteTxnContinue.acquireUninterruptibly();
+ try {
+ // (*3)
+ dsg.add(q1) ;
+ //System.err.println("PROMOTED");
+ return null ;
+ } catch (TDBTransactionException e) {
+ //System.err.println("NOT PROMOTED");
+ return e ;
+ }
+ } ;
+
+ Future<TDBTransactionException> attemptedPromoteFuture = executor.submit(attemptedPromote) ;
+ // Advance "attempted promote" to (*2), inside a read transaction, before attempting a promoting write.
+ // The transaction has been created and started.
+ semaPromoteTxnStart.acquireUninterruptibly();
+
+ // Advance "attempted promote" allowing it to go (*3) where it blocks
+ // This may happen at any time - as soon as it does, the "attempted promote" blocks.
+ semaPromoteTxnContinue.release(1);
+ // I don't know of a better way to ensure "attempted promote" is blocked.
+
+ Lib.sleep(100) ;
+ // Let the active writer go.
+ semaActiveWriterContinue.release(1);
+
+ try {
+ // Collect the active writer.
+ activeWriterFuture.get();
+
+ // (Ideal) and the attempted promotion should advance if the active writer aborts.
+ TDBTransactionException e = attemptedPromoteFuture.get() ;
+ if ( e != null )
+ throw e ;
+ } catch (InterruptedException | ExecutionException e1) { throw new RuntimeException(e1) ; }
+ }
}