You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2011/11/19 21:08:53 UTC

svn commit: r1204062 - in /incubator/jena/Jena2/TDB/trunk: src-dev/dev/ src/main/java/com/hp/hpl/jena/tdb/transaction/ src/test/java/com/hp/hpl/jena/tdb/transaction/

Author: andy
Date: Sat Nov 19 20:08:52 2011
New Revision: 1204062

URL: http://svn.apache.org/viewvc?rev=1204062&view=rev
Log:
JENA-161

Added:
    incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TxnDeadlockTest.java
      - copied, changed from r1204030, incubator/jena/Jena2/TDB/trunk/src-dev/dev/TDBTxnDeadlockTest.java
Removed:
    incubator/jena/Jena2/TDB/trunk/src-dev/dev/TDBTxnDeadlockTest.java
Modified:
    incubator/jena/Jena2/TDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java
    incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TransSystem.java

Modified: incubator/jena/Jena2/TDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/TDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java?rev=1204062&r1=1204061&r2=1204062&view=diff
==============================================================================
--- incubator/jena/Jena2/TDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java (original)
+++ incubator/jena/Jena2/TDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java Sat Nov 19 20:08:52 2011
@@ -72,30 +72,42 @@ public class Transaction
         changesPending = (mode == ReadWrite.WRITE) ;
     }
 
-    /* Commit is a 4 step process
-     * 1/ commitPrepare - call all the components to tell them we are going to commit.
+    /*
+     * Commit is a 4 step process:
+     * 
+     * 1/ commitPrepare - call all the components to tell them we are going to
+     * commit.
+     * 
      * 2/ Actually commit - write the commit point to the journal
+     * 
      * 3/ commitEnact -- make the changes to the original data
-     * 4/ commitClearup -- release resources
-     * The transaction manager is the place which knows all the components in a transaction. 
+     * 
+     * 4/ commitClearup -- release resources The transaction manager is the
+     * place which knows all the components in a transaction.
+     * 
+     * Synchronization note: The transaction manager can call back into a
+     * transaction so make sure that the lock for this object is released before
+     * calling into the transaction manager
      */
     
-    synchronized
     public void commit()
     {
-        // Do prepare, write the COMMIT record.
-        // Enacting is left to the TransactionManager.
-        if ( mode == ReadWrite.WRITE )
+        synchronized (this)
         {
-            if ( state != TxnState.ACTIVE )
-                throw new TDBTransactionException("Transaction has already committed or aborted") ; 
-            prepare() ;
-            journal.write(JournalEntryType.Commit, FileRef.Journal, null) ;
-            journal.sync() ;        // Commit point.
-        }
+            // Do prepare, write the COMMIT record.
+            // Enacting is left to the TransactionManager.
+            if ( mode == ReadWrite.WRITE )
+            {
+                if ( state != TxnState.ACTIVE )
+                    throw new TDBTransactionException("Transaction has already committed or aborted") ; 
+                prepare() ;
+                journal.write(JournalEntryType.Commit, FileRef.Journal, null) ;
+                journal.sync() ;        // Commit point.
+            }
 
-        state = TxnState.COMMITED ;
-        // The transaction manager does the enact and clearup calls 
+            state = TxnState.COMMITED ;
+            // The transaction manager does the enact and clearup calls
+        }
         txnMgr.notifyCommit(this) ;
     }
     
@@ -108,30 +120,32 @@ public class Transaction
             x.commitPrepare(this) ;
     }
 
-    synchronized
     public void abort()
     { 
-        if ( mode == ReadWrite.READ )
+        synchronized (this)
         {
-            state = TxnState.ABORTED ;
-            return ;
-        }
-        
-        if ( state != TxnState.ACTIVE )
-            throw new TDBTransactionException("Transaction has already committed or aborted") ; 
-        
-        // Clearup.
-        for ( BlockMgrJournal x : blkMgrs )
-            x.abort(this) ;
-        
-        for ( NodeTableTrans x : nodeTableTrans )
-            x.abort(this) ;
+            if ( mode == ReadWrite.READ )
+            {
+                state = TxnState.ABORTED ;
+                return ;
+            }
+
+            if ( state != TxnState.ACTIVE )
+                throw new TDBTransactionException("Transaction has already committed or aborted") ; 
 
-        // [TxTDB:TODO]
-        // journal.truncate to last commit 
-        // Not need currently as the journal is only written in prepare. 
+            // Clearup.
+            for ( BlockMgrJournal x : blkMgrs )
+                x.abort(this) ;
+
+            for ( NodeTableTrans x : nodeTableTrans )
+                x.abort(this) ;
+
+            // [TxTDB:TODO]
+            // journal.truncate to last commit 
+            // Not need currently as the journal is only written in prepare. 
 
-        state = TxnState.ABORTED ;
+            state = TxnState.ABORTED ;
+        }
         txnMgr.notifyAbort(this) ;
     }
 
@@ -139,98 +153,97 @@ public class Transaction
      *  read transactions "auto commit" on close().
      *  write transactions must call abort or commit.
      */
-    synchronized
     public void close()
     {
-        switch(state)
+        synchronized (this)
         {
-            case CLOSED:    return ;    // Can call close() repeatedly.
-            case ACTIVE:
-                if ( mode == ReadWrite.READ )
-                    commit() ;
-                else
-                {
-                    SystemTDB.errlog.warn("Transaction not commited or aborted: "+this) ;
-                    abort() ;
-                }
-                break ;
-            default:
+            switch(state)
+            {
+                case CLOSED:    return ;    // Can call close() repeatedly.
+                case ACTIVE:
+                    if ( mode == ReadWrite.READ )
+                        commit() ;
+                    else
+                    {
+                        SystemTDB.errlog.warn("Transaction not commited or aborted: "+this) ;
+                        abort() ;
+                    }
+                    break ;
+                default:
+            }
+
+            state = TxnState.CLOSED ;
         }
-        
-        state = TxnState.CLOSED ;
         txnMgr.notifyClose(this) ;
         
         // Imperfect : too many higher level iterators build on unclosables
-        // (e.g. anoniterators in Iter) 
+        // (e.g. anon iterators in Iter) 
         // so close does not get passed to the base.   
 //        for ( Iterator<?> iter : iterators )
 //            Log.info(this, "Active iterator: "+iter) ;
         
-        // Clear per-transaction temnporary state. 
+        // Clear per-transaction temporary state. 
         iterators.clear() ;
     }
     
     /** A write transaction has been processed and all chanages propageted back to the database */  
-    synchronized
     /*package*/ void signalEnacted()
     {
+        synchronized (this)
+        {
         if ( ! changesPending )
             Log.warn(this, "Transaction was a read transaction or a write transaction that has already been flushed") ; 
        changesPending = false ;
+        }
     }
 
     public ReadWrite getMode()                      { return mode ; }
     public TxnState getState()                      { return state ; }
     
     public long getTxnId()                          { return id ; }
-    public TransactionManager getTxnMgr()                  { return txnMgr ; }
+    public TransactionManager getTxnMgr()           { return txnMgr ; }
     
-    public DatasetGraphTxn getActiveDataset()
-    {
-        return activedsg ;
-    }
+    public DatasetGraphTxn getActiveDataset()       { return activedsg ; }
 
     public void setActiveDataset(DatasetGraphTxn activedsg)
-    {
-        this.activedsg = activedsg ;
-    }
+    { this.activedsg = activedsg ; }
 
-    public Journal getJournal()    { return journal ; }
+    public Journal getJournal()                     { return journal ; }
 
     public List<Iterator<?>> iterators()            { return Collections.unmodifiableList(iterators) ; }
-//    public void addIterator(Iterator<?> iter)       { iterators.add(iter) ; }
-//    public void removeIterator(Iterator<?> iter)    { iterators.remove(iter) ; }
+    
+    public void addIterator(Iterator<?> iter)       { iterators.add(iter) ; }
+    public void removeIterator(Iterator<?> iter)    { iterators.remove(iter) ; }
     
     // Debugging versions - concurrency problems show up because concurrent access
     // to iterators.contains can miss entries when removed by abother thread.
-    // (At least on Oracle JRE - the underlying array is shuffled down by .remove).
-    // See also JENA-131.
-    // After TDB 0.9 release, remove debug versions and leave code above.
+    // See JENA-131.
+    // After TDB 0.9 release, remove debug code.
 
-    private static final boolean DEBUG = false ;     // Don't check-in to SVN trunk with this set to true.
-
-    public void addIterator(Iterator<?> iter)
-    {
-        if ( ! DEBUG )
-            iterators.add(iter) ;
-        else
-        {
-            if ( iterators.contains(iter) )
-                System.err.println("Already added") ;
-            iterators.add(iter) ;
-        }
-    }
-
-    public void removeIterator(Iterator<? > iter)
-    {
-        if ( ! DEBUG )
-            iterators.remove(iter) ;
-        else
-        {
-            if ( ! iterators.contains(iter) )
-                System.err.println("Already closed or not tracked: "+iter) ;
-        }
-    }
+//    private static final boolean DEBUG = false ;     // Don't check-in to SVN trunk with this set to true.
+//
+//    public void addIterator(Iterator<?> iter)
+//    {
+//        if ( ! DEBUG )
+//            iterators.add(iter) ;
+//        else
+//        {
+//            if ( iterators.contains(iter) )
+//                System.err.println("Already added") ;
+//            iterators.add(iter) ;
+//        }
+//    }
+//
+//    public void removeIterator(Iterator<? > iter)
+//    {
+//        if ( ! DEBUG )
+//            iterators.remove(iter) ;
+//        else
+//        {
+//            if ( ! iterators.contains(iter) )
+//                System.err.println("Already closed or not tracked: "+iter) ;
+//        }
+//    }
     
     public List<TransactionLifecycle> components()
     {

Modified: incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TransSystem.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TransSystem.java?rev=1204062&r1=1204061&r2=1204062&view=diff
==============================================================================
--- incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TransSystem.java (original)
+++ incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TransSystem.java Sat Nov 19 20:08:52 2011
@@ -88,9 +88,10 @@ public class T_TransSystem
     static final int writerCommitSeqRepeats = 4 ;
     static final int writerMaxPause         = 20 ;
 
-    private ExecutorService execService = Executors.newCachedThreadPool() ;
+    static final int numTreadsInPool        = 8 ;           // If <= 0 then use an unbounded thread pool.   
+    private static ExecutorService execService = null ;
     
-    public static void main(String...args)
+    public static void main(String...args) throws InterruptedException
     {
         String x = (MEM?"memory":"disk["+SystemTDB.fileMode()+"]") ;
         
@@ -106,6 +107,10 @@ public class T_TransSystem
         for ( i = 0 ; i < Iterations ; i++ )
         {
             clean() ;
+
+            execService = ( numTreadsInPool > 0 ) 
+                ? Executors.newFixedThreadPool(numTreadsInPool)
+                : Executors.newCachedThreadPool() ;
             
             if (!inlineProgress && logging)
                 log.info(format("Iteration: %d\n", i)) ;
@@ -118,6 +123,10 @@ public class T_TransSystem
                     println() ;
             }
             new T_TransSystem().manyReaderAndOneWriter() ;
+            execService.shutdown() ;
+            if ( ! execService.awaitTermination(10, TimeUnit.SECONDS) )
+                System.err.println("Shutdown didn;'t complete in time") ;
+
         }
         if ( inlineProgress )
         {

Copied: incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TxnDeadlockTest.java (from r1204030, incubator/jena/Jena2/TDB/trunk/src-dev/dev/TDBTxnDeadlockTest.java)
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TxnDeadlockTest.java?p2=incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TxnDeadlockTest.java&p1=incubator/jena/Jena2/TDB/trunk/src-dev/dev/TDBTxnDeadlockTest.java&r1=1204030&r2=1204062&rev=1204062&view=diff
==============================================================================
--- incubator/jena/Jena2/TDB/trunk/src-dev/dev/TDBTxnDeadlockTest.java (original)
+++ incubator/jena/Jena2/TDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/T_TxnDeadlockTest.java Sat Nov 19 20:08:52 2011
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package dev;
+package com.hp.hpl.jena.tdb.transaction;
 
 import java.security.SecureRandom ;
 import java.util.Iterator ;
@@ -25,8 +25,6 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors ;
 import java.util.concurrent.atomic.AtomicInteger ;
 
-import junit.framework.Assert ;
-import org.junit.Test ;
 import org.openjena.atlas.logging.Log ;
 
 import com.hp.hpl.jena.graph.Node ;
@@ -37,12 +35,12 @@ import com.hp.hpl.jena.tdb.StoreConnecti
 import com.hp.hpl.jena.tdb.base.file.Location ;
 import com.hp.hpl.jena.tdb.transaction.TransactionManager ;
 
-public class TDBTxnDeadlockTest {
+public class T_TxnDeadlockTest {
 
     static { 
         Log.setLog4j() ; 
         //Log.enable("TDB") ;
-        Log.enable(TransactionManager.class) ;
+        if ( false ) Log.enable(TransactionManager.class) ;
         //Log.enable(LockMRSW.class) ;
     }
     
@@ -50,7 +48,16 @@ public class TDBTxnDeadlockTest {
 
     private static final SecureRandom numberGenerator = new SecureRandom();
 
-    @Test
+    public static void main(String ... argv)
+    {
+        for(int i = 0 ; i < 1000 ; i++ )
+        {
+            System.out.println("Loop = "+i) ;
+            new T_TxnDeadlockTest().test() ;
+        }
+    }
+    
+    //@Test
     public void test() {
         final StoreConnection storeConnection =
                 StoreConnection.make(Location.mem());
@@ -58,7 +65,7 @@ public class TDBTxnDeadlockTest {
         //ExecutorService executor = Executors.newCachedThreadPool()  ;     // Not seen blocking. 
         // 4 blocks maybe 1 in 4 times
         // 8 blocks (quad core) 2 in 3 times.
-        ExecutorService executor = Executors.newFixedThreadPool(2) ;
+        ExecutorService executor = Executors.newFixedThreadPool(8) ;
 
         final AtomicInteger nbQuadruplesAdded = new AtomicInteger();
 
@@ -124,12 +131,12 @@ public class TDBTxnDeadlockTest {
         
         StoreConnection.release(storeConnection.getLocation());
 
-        System.out.println() ;
+//        System.out.println() ;
         System.out.println() ;
         System.out.println("FINISHED") ;
         
-        // This is unsafe - the quad adds may generate duplicates (ity's unlikly 4 random number reoccur but it's possible). 
-        Assert.assertEquals(count, nbQuadruplesAdded.get());
+//        // This is unsafe - the quad adds may generate duplicates (ity's unlikly 4 random number reoccur but it's possible). 
+//        Assert.assertEquals(count, nbQuadruplesAdded.get());
     }
     
 }