You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2011/07/13 22:56:25 UTC

svn commit: r1146468 - in /incubator/jena/Experimental/TxTDB/trunk: ./ src-dev/tx/ src/main/java/com/hp/hpl/jena/tdb/ src/main/java/com/hp/hpl/jena/tdb/base/file/ src/main/java/com/hp/hpl/jena/tdb/transaction/

Author: andy
Date: Wed Jul 13 20:56:24 2011
New Revision: 1146468

URL: http://svn.apache.org/viewvc?rev=1146468&view=rev
Log: (empty)

Modified:
    incubator/jena/Experimental/TxTDB/trunk/D1.ttl
    incubator/jena/Experimental/TxTDB/trunk/log4j.properties
    incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java
    incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TestTransactions.java
    incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferChannel.java
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java

Modified: incubator/jena/Experimental/TxTDB/trunk/D1.ttl
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/D1.ttl?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
Binary files - no diff available.

Modified: incubator/jena/Experimental/TxTDB/trunk/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/log4j.properties?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/log4j.properties (original)
+++ incubator/jena/Experimental/TxTDB/trunk/log4j.properties Wed Jul 13 20:56:24 2011
@@ -18,6 +18,7 @@ log4j.logger.org.openjena.riot=INFO
 
 # TDB 
 log4j.logger.com.hp.hpl.jena.tdb=INFO
+log4j.logger.com.hp.hpl.jena.tdb.transaction=ALL
 log4j.logger.com.hp.hpl.jena.sparql.core.DatasetPrefixStorage=INFO
 
 # Joseki server

Modified: incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java Wed Jul 13 20:56:24 2011
@@ -6,7 +6,14 @@ public class DevTx
     // --------
     // To do for release 0:
     //   Documentation
+    //   StoreConnection.Location.mem ==> directory called "--mem--" and chaos.
+    // Partial reply : base reader exits - replay until blocking higher reader. 
+    // Node writing happens during prepare() regardless of blockers and locks.  Opps.
+    // Abort() etc truncates Journal to 0 - but that might loose a pending transaction.  Opps?? Happens?
+    // Some kind of call after commit for clear up.
+    //    Call when really commited.
     // --------
+    //  CRC and bullet-proof read of Journal.
     
     // Tests
     

Modified: incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TestTransactions.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TestTransactions.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TestTransactions.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TestTransactions.java Wed Jul 13 20:56:24 2011
@@ -18,8 +18,75 @@
 
 package tx;
 
-public class TestTransactions
+import org.junit.After ;
+import org.junit.Before ;
+import org.junit.Test ;
+import org.openjena.atlas.junit.BaseTest ;
+import org.openjena.atlas.lib.FileOps ;
+
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.graph.Triple ;
+import com.hp.hpl.jena.sparql.core.Quad ;
+import com.hp.hpl.jena.sparql.util.NodeFactory ;
+import com.hp.hpl.jena.tdb.ConfigTest ;
+import com.hp.hpl.jena.tdb.DatasetGraphTxn ;
+import com.hp.hpl.jena.tdb.ReadWrite ;
+import com.hp.hpl.jena.tdb.StoreConnection ;
+import com.hp.hpl.jena.tdb.base.file.Location ;
+
+public class TestTransactions extends BaseTest
 {
+    static Node s = NodeFactory.parseNode("<s>") ;
+    static Node p = NodeFactory.parseNode("<p>") ;
+    static Node o = NodeFactory.parseNode("<o>") ;
+    static Node g = NodeFactory.parseNode("<g>") ;
+    static Triple t = new Triple(s,p,o) ;
+    static Quad q = new Quad(g,s,p,o) ;
+    
+    static final String DIR = ConfigTest.getTestingDirDB() ;
+    static final Location LOC = new Location(DIR) ;
+    
+    @Before public void setup()
+    {
+        FileOps.clearDirectory(DIR) ;
+        StoreConnection.reset() ;
+        StoreConnection sConn = StoreConnection.make(LOC) ;
+    }
+    
+    @After public void teardown() {} 
+    
+    @Test public void trans_01()
+    {
+        StoreConnection sConn = StoreConnection.make(LOC) ;
+        DatasetGraphTxn dsg = sConn.begin(ReadWrite.READ) ;
+        dsg.close() ;
+    }
+    
+    @Test public void trans_02()
+    {
+        StoreConnection sConn = StoreConnection.make(LOC) ;
+        DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ;
+        try {
+            dsg.add(q) ;
+            assertTrue(dsg.contains(q)) ;
+            dsg.commit() ;
+        } finally { dsg.close() ; }
+    }
+    
+    @Test public void trans_03()
+    {
+        StoreConnection sConn = StoreConnection.make(LOC) ;
+        DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ;
+        
+        dsg.add(q) ;
+        assertTrue(dsg.contains(q)) ;
+        dsg.commit() ;
+        dsg.close() ;
+        
+        DatasetGraphTxn dsg2 = sConn.begin(ReadWrite.READ) ;
+        assertTrue(dsg2.contains(q)) ;
+        dsg2.close() ;
+    }
 
 }
 

Modified: incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java Wed Jul 13 20:56:24 2011
@@ -9,6 +9,7 @@ package tx;
 import org.openjena.atlas.lib.Bytes ;
 import org.openjena.atlas.lib.FileOps ;
 import org.openjena.atlas.logging.Log ;
+import org.openjena.riot.RiotWriter ;
 
 import com.hp.hpl.jena.graph.Graph ;
 import com.hp.hpl.jena.query.DatasetFactory ;
@@ -20,6 +21,7 @@ import com.hp.hpl.jena.query.Syntax ;
 import com.hp.hpl.jena.rdf.model.Model ;
 import com.hp.hpl.jena.rdf.model.ModelFactory ;
 import com.hp.hpl.jena.sparql.core.DatasetGraph ;
+import com.hp.hpl.jena.sparql.sse.SSE ;
 import com.hp.hpl.jena.sparql.util.QueryExecUtils ;
 import com.hp.hpl.jena.tdb.DatasetGraphTxn ;
 import com.hp.hpl.jena.tdb.ReadWrite ;
@@ -58,34 +60,83 @@ public class TxMain
         initFS() ;
         StoreConnection sConn = StoreConnection.make(DBdir) ;
         
+        // ---- Simple
+        if ( false )
+        {
+            DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ;
+            //load("D.ttl", dsg) ;    // Loads 3 triples
+            dsg.getDefaultGraph().add(SSE.parseTriple("(<s> <p> <o>)")) ;
+            dsg.add(SSE.parseQuad("(<g> <s> <p> 123)")) ;
+            dsg.add(SSE.parseQuad("(_ <s> <p> 123)")) ;
+            
+            dsg.commit() ;
+            dsg.close() ;
+            DatasetGraphTxn dsgRead = sConn.begin(ReadWrite.READ) ;
+            dump(dsgRead) ;
+            query("SELECT ?g (count(*) AS ?C) { { ?s ?p ?o } UNION { GRAPH ?g { ?s ?p ?o } } } GROUP BY ?g", dsgRead) ;
+            dsgRead.close() ;
+            exit(0) ;
+        }
+        
+        if ( false )
+        {
+            // Blocking transaction
+            DatasetGraphTxn dsgRead = sConn.begin(ReadWrite.READ) ;
+            
+            
+            DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ; // Optional label.
+            //load("D.ttl", dsg) ;    // Loads 3 triples
+            dsg.getDefaultGraph().add(SSE.parseTriple("(<s> <p> <o>)")) ;
+            dsg.add(SSE.parseQuad("(<g> <s> <p> 123)")) ;
+            dsg.add(SSE.parseQuad("(_ <s> <p> 123)")) ;
+            dsg.commit() ;
+            dsg.close() ;
+            
+            dump(dsgRead) ;
+            query("SELECT ?g (count(*) AS ?C) { { ?s ?p ?o } UNION { GRAPH ?g { ?s ?p ?o } } } GROUP BY ?g", dsgRead) ;
+            dsgRead.close() ;
+            DatasetGraphTxn dsgRead2  = sConn.begin(ReadWrite.READ) ;
+            query("SELECT ?g (count(*) AS ?C) { { ?s ?p ?o } UNION { GRAPH ?g { ?s ?p ?o } } } GROUP BY ?g", dsgRead2) ;
+            dsgRead2.close() ;
+            exit(0) ;
+        }
+        
+        // BUG somewhere.
+        //   Check DevTx list of things to do.
         // Take a blocking read connection.
-        DatasetGraphTxn dsgRead = sConn.begin(ReadWrite.READ) ; // Optional label.
+        DatasetGraphTxn dsgRead = sConn.begin(ReadWrite.READ) ; //dsgRead.close() ;
 
         DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ;
         load("D.ttl", dsg) ;    // Loads 3 triples
+        query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsgRead) ;
         query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsg) ;
-
         dsg.commit() ;
         dsg.close() ;
+        dsg = null ;
         
+        // Reader after update.
+        // First reader still reading.
         
-        // Reader still open.
-        // At this point, there is a blocking  
-        
-        dsg = sConn.begin(ReadWrite.WRITE) ;
-        load("D1.ttl", dsg) ; // Loads 1 triples
-        
-        query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsg) ;
+        //DatasetGraphTxn dsgRead2 = sConn.begin(ReadWrite.READ) ;
+        //query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsgRead2) ;
         query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsgRead) ;
-        
-        
-        dsg.commit() ;
-        dsg.close() ;
 
-        DatasetGraphTxn dsgRead2 = sConn.begin(ReadWrite.READ) ;
-        query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsgRead2) ;
-        dsgRead2.close() ;
-        
+        // A writer.
+        DatasetGraphTxn dsg2 = sConn.begin(ReadWrite.WRITE) ;
+        load("D1.ttl", dsg2) ; // Loads 1 triples
+        query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsg2) ;
+        //query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsgRead2) ;
+        query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsgRead) ;
+        dsg2.commit() ;
+        dsg2.close() ;
+        dsg2 = null ;
+
+        //dsgRead2.close() ;
+
+//        DatasetGraphTxn dsgRead2 = sConn.begin(ReadWrite.READ) ;
+//        query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsgRead2) ;
+//        dsgRead2.close() ;
+
         dsgRead.close() ;   // Transaction can now write changes to the real DB.
         
         // ILLEGAL!!!!
@@ -117,6 +168,11 @@ public class TxMain
         return dsg ;
     }
 
+    public static void dump(DatasetGraphTxn dsg)
+    {
+        RiotWriter.writeNQuads(System.out, dsg) ;
+    }
+    
     public static void query(String queryStr, DatasetGraphTxn dsg)
     {
         String x = "Query ("+dsg.getTransaction().getLabel()+")" ;

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java Wed Jul 13 20:56:24 2011
@@ -64,6 +64,14 @@ public class StoreConnection
 
     private static Map<Location, StoreConnection> cache = new HashMap<Location, StoreConnection>() ;
     
+    public static void reset() 
+    {
+        for ( Map.Entry<Location, StoreConnection> e : cache.entrySet() )
+            e.getValue().baseDSG.close() ;
+        
+        cache.clear() ;
+    }
+    
     public static StoreConnection make(Location location)
     {
         TDBMaker.releaseLocation(location) ;

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferChannel.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferChannel.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferChannel.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferChannel.java Wed Jul 13 20:56:24 2011
@@ -45,7 +45,7 @@ public interface BufferChannel extends S
     /** set the position */
     public void position(long pos) ;
 
-    /** Read into a ByteBuffer. Returns the number of bytes read.
+    /** Read into a ByteBuffer. Returns the number of bytes read. -1 for end of file.
      */
     public int read(ByteBuffer buffer) ;
     

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java Wed Jul 13 20:56:24 2011
@@ -135,7 +135,13 @@ class Journal implements Iterable<Journa
         // UGLY Maybe better to leave some space in the block's byte buffer.
         // [TxTDB:TODO] Make robust against partial read.
         header.clear() ;
-        channel.read(header) ;
+        int lenRead = channel.read(header) ;
+        if ( lenRead == -1 )
+        {
+            // probably broken file.
+            throw new TDBTransactionException("Read off the end of a journal file") ;
+            //return null ;
+        }
         header.rewind() ;
         int typeId  = header.getInt() ; 
         int len     = header.getInt() ;

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java Wed Jul 13 20:56:24 2011
@@ -134,7 +134,16 @@ public class JournalControl
             FileOps.delete(objFilename) ;
     }
     
-    public static void replay(Journal journal, DatasetGraphTDB dsg)
+    public static void replay(Transaction transaction)
+    {
+        Journal journal = transaction.getJournal() ;
+        DatasetGraphTDB dsg = transaction.getBaseDataset() ;
+        replay(journal, dsg) ;
+//        Iterator<Transactional> iter = transaction.components() ;
+//        xxxxxxxxxxxxxxxx
+    }
+    
+    private static void replay(Journal journal, DatasetGraphTDB dsg)
     {
         journal.position(0) ;
         dsg.getLock().enterCriticalSection(Lock.WRITE) ;

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java Wed Jul 13 20:56:24 2011
@@ -43,6 +43,12 @@ public class Transaction
         if (label == null )
             label = "Txn" ;
         label = label+"["+id+"]" ;
+        switch(mode)
+        {
+            case READ : label = label+"/R" ; break ;
+            case WRITE : label = label+"/W" ; break ;
+        }
+        
         this.label = label ;
         this.txnMgr = txnMgr ;
         this.basedsg = dsg ;
@@ -61,7 +67,7 @@ public class Transaction
         {
             if ( state != TxnState.ACTIVE )
                 throw new TDBTransactionException("Transaction has already committed or aborted") ; 
-
+            prepare() ;
             JournalEntry entry = new JournalEntry(JournalEntryType.Commit, FileRef.Journal, null) ;
             journal.writeJournal(entry) ;
             journal.sync() ;        // Commit point.
@@ -73,13 +79,7 @@ public class Transaction
     
     private void prepare()
     {
-        if ( mode == ReadWrite.READ )
-            return ;
-        
-        if ( state != TxnState.ACTIVE )
-            throw new TDBTransactionException("Transaction has already committed or aborted") ; 
         state = TxnState.PREPARING ;
-        
         for ( BlockMgrJournal x : blkMgrs )
             x.commitPrepare(this) ;
         for ( NodeTableTrans x : nodeTableTrans )
@@ -110,6 +110,7 @@ public class Transaction
         
         for ( NodeTableTrans x : nodeTableTrans )
             x.abort(this) ;
+        // [TxTDB:TODO] : No - truncates a pending transaction.
         journal.truncate(0) ;
 
         state = TxnState.ABORTED ;
@@ -125,6 +126,7 @@ public class Transaction
     {
         switch(state)
         {
+            case CLOSED:    return ;    // Can call close() repeatedly.
             case ACTIVE:
                 if ( mode == ReadWrite.READ )
                     commit() ;
@@ -151,18 +153,8 @@ public class Transaction
     }
     
     public ReadWrite getMode()                      { return mode ; }
-    public TxnState getState()                         { return state ; }
+    public TxnState getState()                      { return state ; }
     
-    List<NodeTableTrans> getNodeTableTrans()
-    {
-        return nodeTableTrans ;
-    }
-
-    List<BlockMgrJournal> getBlkMgrs()
-    {
-        return blkMgrs ;
-    }
-
     public long getTxnId()                          { return id ; }
     TransactionManager getTxnMgr()                  { return txnMgr ; }
     

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java Wed Jul 13 20:56:24 2011
@@ -7,6 +7,7 @@
 package com.hp.hpl.jena.tdb.transaction;
 
 import static com.hp.hpl.jena.tdb.ReadWrite.READ ;
+import static com.hp.hpl.jena.tdb.sys.SystemTDB.syslog ;
 import static java.lang.String.format ;
 
 import java.util.ArrayList ;
@@ -19,6 +20,7 @@ import java.util.concurrent.LinkedBlocki
 
 import org.openjena.atlas.logging.Log ;
 import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
 
 import com.hp.hpl.jena.tdb.DatasetGraphTxn ;
 import com.hp.hpl.jena.tdb.ReadWrite ;
@@ -30,7 +32,7 @@ public class TransactionManager
     // TODO Don't keep counter, keep lists.
     // TODO Useful logging.
     
-    private static Logger log = SystemTDB.syslog ;
+    private static Logger log = LoggerFactory.getLogger(TransactionManager.class) ;
     
     private Set<Transaction> activeTransactions = new HashSet<Transaction>() ;
     
@@ -105,7 +107,6 @@ public class TransactionManager
         if ( ! commitedAwaitingFlush.isEmpty() )
             dsg = commitedAwaitingFlush.get(commitedAwaitingFlush.size()-1).getActiveDataset() ;
         
-        
         Transaction txn = createTransaction(dsg, mode, label) ;
         DatasetGraphTxn dsgTxn = (DatasetGraphTxn)new DatasetBuilderTxn(this).build(txn, mode, dsg) ;
         txn.setActiveDataset(dsgTxn) ;
@@ -116,16 +117,14 @@ public class TransactionManager
             iter.next().begin(dsgTxn.getTransaction()) ;
 
         activeTransactions.add(txn) ;
-        if ( log.isDebugEnabled() )
-            log.debug("begin: "+txn) ;
+        log("begin",txn) ;
         return dsgTxn ;
     }
 
     synchronized
     public void notifyCommit(Transaction transaction)
     {
-        if ( log.isDebugEnabled() )
-            log.debug("commit: "+transaction) ;
+        log("commit", transaction) ;
 
         // Transaction has done the commitPrepare - can we enact it?
         
@@ -134,44 +133,44 @@ public class TransactionManager
         
         endTransaction(transaction) ;
         
-        if ( readers == 0 && transaction.getMode() == ReadWrite.WRITE )
-            // New readers blocked from starting by the synchronized here and on begin. 
-            JournalControl.replay(transaction.getJournal(), transaction.getBaseDataset()) ;
-        else
+        if ( transaction.getMode() == ReadWrite.WRITE )
         {
-            commitedAwaitingFlush.add(transaction) ;
-            if ( log.isDebugEnabled() )
-                log.info("Commit blocked at the moment") ;
-            queue.add(transaction) ;
+            if ( readers == 0 )
+                // Can commit imemdiately.
+                commitTransaction(transaction) ;
+            else
+            {
+                // Can't make permentent at the moment.
+                commitedAwaitingFlush.add(transaction) ;
+                log.debug("Commit pending: "+transaction.getLabel()); 
+
+                //if ( log.isDebugEnabled() )
+                //    log.debug("Commit blocked at the moment") ;
+                queue.add(transaction) ;
+            }
         }
     }
 
     private void commitTransaction(Transaction transaction)
     {
         // Really, really do it!
-        for ( BlockMgrJournal x : transaction.getBlkMgrs() )
-        {
-            x.commitEnact(transaction) ;
-            x.clearup(transaction) ;
-        }
-        
-        for ( NodeTableTrans x : transaction.getNodeTableTrans() )
+        Iterator<Transactional> iter = transaction.components() ;
+        for ( ; iter.hasNext() ; )
         {
+            Transactional x = iter.next() ;
             x.commitEnact(transaction) ;
             x.clearup(transaction) ;
         }
-        
         // This cleans up as well.
-        JournalControl.replay(transaction.getJournal(), transaction.getBaseDataset()) ;
+        JournalControl.replay(transaction) ;
     }
     
     synchronized
     public void notifyAbort(Transaction transaction)
-    {    
+    {
+        log("abort", transaction) ;
         // Transaction has done the abort on all the transactional elements.
         // TODO Suppose the system journal has  
-        if ( log.isDebugEnabled() )
-            log.info("abort: "+transaction) ;
         if ( ! activeTransactions.contains(transaction) )
             SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ;
         endTransaction(transaction) ;
@@ -180,13 +179,12 @@ public class TransactionManager
     synchronized
     public void notifyClose(Transaction txn)
     {
-        if ( log.isDebugEnabled() )
-            log.debug("notifyClose: "+txn) ;
+        log("close", txn) ;
         
         if ( txn.getState() == TxnState.ACTIVE )
         {
             String x = txn.getBaseDataset().getLocation().getDirectoryPath() ;
-            SystemTDB.syslog.warn("close: Transaction not commited or aborted: Transaction: "+txn.getTxnId()+" @ "+x) ;
+            syslog.warn("close: Transaction not commited or aborted: Transaction: "+txn.getTxnId()+" @ "+x) ;
             txn.abort() ;
         }
 
@@ -202,10 +200,9 @@ public class TransactionManager
                     Transaction txn2 = queue.take() ;
                     if ( txn2.getMode() == READ )
                         continue ;
-                    log.info("Delayed commit") ;
+                    log("Delayed commit", txn2) ;
                     // This takes a Write lock on the  DSG - this is where it blocks.
-                    JournalControl.replay(txn2.getJournal(), txn2.getBaseDataset()) ;
-                    log.info("Delayed commit succeeded") ;
+                    JournalControl.replay(txn2) ;
                     commitedAwaitingFlush.remove(txn) ;
                 } catch (InterruptedException ex)
                 { Log.fatal(this, "Interruped!", ex) ; }
@@ -213,8 +210,7 @@ public class TransactionManager
         }
         else
         {
-            if ( log.isDebugEnabled() )
-                log.debug(format("Pending transactions: R=%d / W=%d", readers, writers)) ;
+            if ( log() ) log(format("Pending transactions: R=%d / W=%d", readers, writers), txn) ;
         }
     }
     
@@ -227,6 +223,21 @@ public class TransactionManager
         activeTransactions.remove(transaction) ;
         
     }
+    
+    private boolean log()
+    {
+        return syslog.isDebugEnabled() || log.isDebugEnabled() ;
+    }
+    
+    private void log(String msg, Transaction txn)
+    {
+        if ( ! log() )
+            return ;
+        if ( syslog.isDebugEnabled() )
+            syslog.debug(txn.getLabel()+": "+msg) ;
+        else
+            log.debug(txn.getLabel()+": "+msg) ;
+    }
 
     // LATER.
     class Committer implements Runnable
@@ -244,7 +255,7 @@ public class TransactionManager
                     Transaction txn = queue.take() ;
                     System.out.println("Async commit") ;
                     // This takes a Write lock on the  DSG - this is where it blocks.
-                    JournalControl.replay(txn.getJournal(), txn.getBaseDataset()) ;
+                    JournalControl.replay(txn) ;
                     System.out.println("Async commit succeeded") ;
                     synchronized(TransactionManager.this)
                     {