You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2011/11/19 21:08:53 UTC
svn commit: r1204062 - in /incubator/jena/Jena2/TDB/trunk: src-dev/dev/
src/main/java/com/hp/hpl/jena/tdb/transaction/
src/test/java/com/hp/hpl/jena/tdb/transaction/
Author: andy
Date: Sat Nov 19 20:08:52 2011
New Revision: 1204062
URL: http://svn.apache.org/viewvc?rev=1204062&view=rev
Log:
JENA-161
Added:
incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TxnDeadlockTest.java
- copied, changed from r1204030, incubator/jena/Jena2/TDB/trunk/src-dev/dev/TDBTxnDeadlockTest.java
Removed:
incubator/jena/Jena2/TDB/trunk/src-dev/dev/TDBTxnDeadlockTest.java
Modified:
incubator/jena/Jena2/TDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java
incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TransSystem.java
Modified: incubator/jena/Jena2/TDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/TDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java?rev=1204062&r1=1204061&r2=1204062&view=diff
==============================================================================
--- incubator/jena/Jena2/TDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java (original)
+++ incubator/jena/Jena2/TDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java Sat Nov 19 20:08:52 2011
@@ -72,30 +72,42 @@ public class Transaction
changesPending = (mode == ReadWrite.WRITE) ;
}
- /* Commit is a 4 step process
- * 1/ commitPrepare - call all the components to tell them we are going to commit.
+ /*
+ * Commit is a 4 step process:
+ *
+ * 1/ commitPrepare - call all the components to tell them we are going to
+ * commit.
+ *
* 2/ Actually commit - write the commit point to the journal
+ *
* 3/ commitEnact -- make the changes to the original data
- * 4/ commitClearup -- release resources
- * The transaction manager is the place which knows all the components in a transaction.
+ *
+ * 4/ commitClearup -- release resources The transaction manager is the
+ * place which knows all the components in a transaction.
+ *
+ * Synchronization note: The transaction manager can call back into a
+ * transaction so make sure that the lock for this object is released before
+ * calling into the transaction manager
*/
- synchronized
public void commit()
{
- // Do prepare, write the COMMIT record.
- // Enacting is left to the TransactionManager.
- if ( mode == ReadWrite.WRITE )
+ synchronized (this)
{
- if ( state != TxnState.ACTIVE )
- throw new TDBTransactionException("Transaction has already committed or aborted") ;
- prepare() ;
- journal.write(JournalEntryType.Commit, FileRef.Journal, null) ;
- journal.sync() ; // Commit point.
- }
+ // Do prepare, write the COMMIT record.
+ // Enacting is left to the TransactionManager.
+ if ( mode == ReadWrite.WRITE )
+ {
+ if ( state != TxnState.ACTIVE )
+ throw new TDBTransactionException("Transaction has already committed or aborted") ;
+ prepare() ;
+ journal.write(JournalEntryType.Commit, FileRef.Journal, null) ;
+ journal.sync() ; // Commit point.
+ }
- state = TxnState.COMMITED ;
- // The transaction manager does the enact and clearup calls
+ state = TxnState.COMMITED ;
+ // The transaction manager does the enact and clearup calls
+ }
txnMgr.notifyCommit(this) ;
}
@@ -108,30 +120,32 @@ public class Transaction
x.commitPrepare(this) ;
}
- synchronized
public void abort()
{
- if ( mode == ReadWrite.READ )
+ synchronized (this)
{
- state = TxnState.ABORTED ;
- return ;
- }
-
- if ( state != TxnState.ACTIVE )
- throw new TDBTransactionException("Transaction has already committed or aborted") ;
-
- // Clearup.
- for ( BlockMgrJournal x : blkMgrs )
- x.abort(this) ;
-
- for ( NodeTableTrans x : nodeTableTrans )
- x.abort(this) ;
+ if ( mode == ReadWrite.READ )
+ {
+ state = TxnState.ABORTED ;
+ return ;
+ }
+
+ if ( state != TxnState.ACTIVE )
+ throw new TDBTransactionException("Transaction has already committed or aborted") ;
- // [TxTDB:TODO]
- // journal.truncate to last commit
- // Not need currently as the journal is only written in prepare.
+ // Clearup.
+ for ( BlockMgrJournal x : blkMgrs )
+ x.abort(this) ;
+
+ for ( NodeTableTrans x : nodeTableTrans )
+ x.abort(this) ;
+
+ // [TxTDB:TODO]
+ // journal.truncate to last commit
+ // Not need currently as the journal is only written in prepare.
- state = TxnState.ABORTED ;
+ state = TxnState.ABORTED ;
+ }
txnMgr.notifyAbort(this) ;
}
@@ -139,98 +153,97 @@ public class Transaction
* read transactions "auto commit" on close().
* write transactions must call abort or commit.
*/
- synchronized
public void close()
{
- switch(state)
+ synchronized (this)
{
- case CLOSED: return ; // Can call close() repeatedly.
- case ACTIVE:
- if ( mode == ReadWrite.READ )
- commit() ;
- else
- {
- SystemTDB.errlog.warn("Transaction not commited or aborted: "+this) ;
- abort() ;
- }
- break ;
- default:
+ switch(state)
+ {
+ case CLOSED: return ; // Can call close() repeatedly.
+ case ACTIVE:
+ if ( mode == ReadWrite.READ )
+ commit() ;
+ else
+ {
+ SystemTDB.errlog.warn("Transaction not commited or aborted: "+this) ;
+ abort() ;
+ }
+ break ;
+ default:
+ }
+
+ state = TxnState.CLOSED ;
}
-
- state = TxnState.CLOSED ;
txnMgr.notifyClose(this) ;
// Imperfect : too many higher level iterators build on unclosables
- // (e.g. anoniterators in Iter)
+ // (e.g. anon iterators in Iter)
// so close does not get passed to the base.
// for ( Iterator<?> iter : iterators )
// Log.info(this, "Active iterator: "+iter) ;
- // Clear per-transaction temnporary state.
+ // Clear per-transaction temporary state.
iterators.clear() ;
}
/** A write transaction has been processed and all chanages propageted back to the database */
- synchronized
/*package*/ void signalEnacted()
{
+ synchronized (this)
+ {
if ( ! changesPending )
Log.warn(this, "Transaction was a read transaction or a write transaction that has already been flushed") ;
changesPending = false ;
+ }
}
public ReadWrite getMode() { return mode ; }
public TxnState getState() { return state ; }
public long getTxnId() { return id ; }
- public TransactionManager getTxnMgr() { return txnMgr ; }
+ public TransactionManager getTxnMgr() { return txnMgr ; }
- public DatasetGraphTxn getActiveDataset()
- {
- return activedsg ;
- }
+ public DatasetGraphTxn getActiveDataset() { return activedsg ; }
public void setActiveDataset(DatasetGraphTxn activedsg)
- {
- this.activedsg = activedsg ;
- }
+ { this.activedsg = activedsg ; }
- public Journal getJournal() { return journal ; }
+ public Journal getJournal() { return journal ; }
public List<Iterator<?>> iterators() { return Collections.unmodifiableList(iterators) ; }
-// public void addIterator(Iterator<?> iter) { iterators.add(iter) ; }
-// public void removeIterator(Iterator<?> iter) { iterators.remove(iter) ; }
+
+ public void addIterator(Iterator<?> iter) { iterators.add(iter) ; }
+ public void removeIterator(Iterator<?> iter) { iterators.remove(iter) ; }
// Debugging versions - concurrency problems show up because concurrent access
// to iterators.contains can miss entries when removed by abother thread.
- // (At least on Oracle JRE - the underlying array is shuffled down by .remove).
- // See also JENA-131.
- // After TDB 0.9 release, remove debug versions and leave code above.
+ // See JENA-131.
+ // After TDB 0.9 release, remove debug code.
- private static final boolean DEBUG = false ; // Don't check-in to SVN trunk with this set to true.
-
- public void addIterator(Iterator<?> iter)
- {
- if ( ! DEBUG )
- iterators.add(iter) ;
- else
- {
- if ( iterators.contains(iter) )
- System.err.println("Already added") ;
- iterators.add(iter) ;
- }
- }
-
- public void removeIterator(Iterator<? > iter)
- {
- if ( ! DEBUG )
- iterators.remove(iter) ;
- else
- {
- if ( ! iterators.contains(iter) )
- System.err.println("Already closed or not tracked: "+iter) ;
- }
- }
+// private static final boolean DEBUG = false ; // Don't check-in to SVN trunk with this set to true.
+//
+// public void addIterator(Iterator<?> iter)
+// {
+// if ( ! DEBUG )
+// iterators.add(iter) ;
+// else
+// {
+// if ( iterators.contains(iter) )
+// System.err.println("Already added") ;
+// iterators.add(iter) ;
+// }
+// }
+//
+// public void removeIterator(Iterator<? > iter)
+// {
+// if ( ! DEBUG )
+// iterators.remove(iter) ;
+// else
+// {
+// if ( ! iterators.contains(iter) )
+// System.err.println("Already closed or not tracked: "+iter) ;
+// }
+// }
public List<TransactionLifecycle> components()
{
Modified: incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TransSystem.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TransSystem.java?rev=1204062&r1=1204061&r2=1204062&view=diff
==============================================================================
--- incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TransSystem.java (original)
+++ incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TransSystem.java Sat Nov 19 20:08:52 2011
@@ -88,9 +88,10 @@ public class T_TransSystem
static final int writerCommitSeqRepeats = 4 ;
static final int writerMaxPause = 20 ;
- private ExecutorService execService = Executors.newCachedThreadPool() ;
+ static final int numTreadsInPool = 8 ; // If <= 0 then use an unbounded thread pool.
+ private static ExecutorService execService = null ;
- public static void main(String...args)
+ public static void main(String...args) throws InterruptedException
{
String x = (MEM?"memory":"disk["+SystemTDB.fileMode()+"]") ;
@@ -106,6 +107,10 @@ public class T_TransSystem
for ( i = 0 ; i < Iterations ; i++ )
{
clean() ;
+
+ execService = ( numTreadsInPool > 0 )
+ ? Executors.newFixedThreadPool(numTreadsInPool)
+ : Executors.newCachedThreadPool() ;
if (!inlineProgress && logging)
log.info(format("Iteration: %d\n", i)) ;
@@ -118,6 +123,10 @@ public class T_TransSystem
println() ;
}
new T_TransSystem().manyReaderAndOneWriter() ;
+ execService.shutdown() ;
+ if ( ! execService.awaitTermination(10, TimeUnit.SECONDS) )
+ System.err.println("Shutdown didn;'t complete in time") ;
+
}
if ( inlineProgress )
{
Copied: incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TxnDeadlockTest.java (from r1204030, incubator/jena/Jena2/TDB/trunk/src-dev/dev/TDBTxnDeadlockTest.java)
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TxnDeadlockTest.java?p2=incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TxnDeadlockTest.java&p1=incubator/jena/Jena2/TDB/trunk/src-dev/dev/TDBTxnDeadlockTest.java&r1=1204030&r2=1204062&rev=1204062&view=diff
==============================================================================
--- incubator/jena/Jena2/TDB/trunk/src-dev/dev/TDBTxnDeadlockTest.java (original)
+++ incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TxnDeadlockTest.java Sat Nov 19 20:08:52 2011
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package dev;
+package com.hp.hpl.jena.tdb.transaction;
import java.security.SecureRandom ;
import java.util.Iterator ;
@@ -25,8 +25,6 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors ;
import java.util.concurrent.atomic.AtomicInteger ;
-import junit.framework.Assert ;
-import org.junit.Test ;
import org.openjena.atlas.logging.Log ;
import com.hp.hpl.jena.graph.Node ;
@@ -37,12 +35,12 @@ import com.hp.hpl.jena.tdb.StoreConnecti
import com.hp.hpl.jena.tdb.base.file.Location ;
import com.hp.hpl.jena.tdb.transaction.TransactionManager ;
-public class TDBTxnDeadlockTest {
+public class T_TxnDeadlockTest {
static {
Log.setLog4j() ;
//Log.enable("TDB") ;
- Log.enable(TransactionManager.class) ;
+ if ( false ) Log.enable(TransactionManager.class) ;
//Log.enable(LockMRSW.class) ;
}
@@ -50,7 +48,16 @@ public class TDBTxnDeadlockTest {
private static final SecureRandom numberGenerator = new SecureRandom();
- @Test
+ public static void main(String ... argv)
+ {
+ for(int i = 0 ; i < 1000 ; i++ )
+ {
+ System.out.println("Loop = "+i) ;
+ new T_TxnDeadlockTest().test() ;
+ }
+ }
+
+ //@Test
public void test() {
final StoreConnection storeConnection =
StoreConnection.make(Location.mem());
@@ -58,7 +65,7 @@ public class TDBTxnDeadlockTest {
//ExecutorService executor = Executors.newCachedThreadPool() ; // Not seen blocking.
// 4 blocks maybe 1 in 4 times
// 8 blocks (quad core) 2 in 3 times.
- ExecutorService executor = Executors.newFixedThreadPool(2) ;
+ ExecutorService executor = Executors.newFixedThreadPool(8) ;
final AtomicInteger nbQuadruplesAdded = new AtomicInteger();
@@ -124,12 +131,12 @@ public class TDBTxnDeadlockTest {
StoreConnection.release(storeConnection.getLocation());
- System.out.println() ;
+// System.out.println() ;
System.out.println() ;
System.out.println("FINISHED") ;
- // This is unsafe - the quad adds may generate duplicates (ity's unlikly 4 random number reoccur but it's possible).
- Assert.assertEquals(count, nbQuadruplesAdded.get());
+// // This is unsafe - the quad adds may generate duplicates (ity's unlikly 4 random number reoccur but it's possible).
+// Assert.assertEquals(count, nbQuadruplesAdded.get());
}
}