You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2012/06/08 12:53:29 UTC

svn commit: r1348022 - in /jena/trunk/jena-tdb/src: main/java/com/hp/hpl/jena/tdb/StoreConnection.java main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java test/java/com/hp/hpl/jena/tdb/store/AbstractStoreConnections.java

Author: andy
Date: Fri Jun  8 10:53:29 2012
New Revision: 1348022

URL: http://svn.apache.org/viewvc?rev=1348022&view=rev
Log:
JENA-250
Sync on transitions to tarsnactional mode.

Modified:
    jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java
    jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
    jena/trunk/jena-tdb/src/test/java/com/hp/hpl/jena/tdb/store/AbstractStoreConnections.java

Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java?rev=1348022&r1=1348021&r2=1348022&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java Fri Jun  8 10:53:29 2012
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package com.hp.hpl.jena.tdb;
+package com.hp.hpl.jena.tdb ;
 
 import java.util.HashMap ;
 import java.util.HashSet ;
@@ -31,159 +31,203 @@ import com.hp.hpl.jena.tdb.store.Dataset
 import com.hp.hpl.jena.tdb.sys.TDBMaker ;
 import com.hp.hpl.jena.tdb.transaction.* ;
 
-
-/** Interface to the TDB transaction mechanism. */ 
+/** Interface to the TDB transaction mechanism. */
 public class StoreConnection
 {
     // A StoreConnection is the reference to the underlying storage.
-    // There is cache of backing datasets, managed by statics in StoreConnection.
+    // There is cache of backing datasets, managed by statics in
+    // StoreConnection.
     // The work of transaction coordination is done in TransactionManager.
-    
+
     private final TransactionManager transactionManager ;
-    private final DatasetGraphTDB baseDSG ;
+    private final DatasetGraphTDB    baseDSG ;
+    private boolean                  isValid = true ;
 
-//    private StoreConnection(Location location)
-//    {
-//        baseDSG = DatasetBuilderStd.build(location) ;
-//        transactionManager = new TransactionManager(baseDSG) ;
-//    }
-//    
+    // private StoreConnection(Location location)
+    // {
+    // baseDSG = DatasetBuilderStd.build(location) ;
+    // transactionManager = new TransactionManager(baseDSG) ;
+    // }
+    //
     private StoreConnection(DatasetGraphTDB dsg)
     {
         baseDSG = dsg ;
         transactionManager = new TransactionManager(baseDSG) ;
     }
-    
-    public Location getLocation() { return baseDSG.getLocation() ; }
-    /** Return the associated transaction manager - do NOT use to manipulate transactions */  
-    public TransactionManager getTransMgr() { return transactionManager ; }
-    
-    /** Return a description of the transaction manager state */  
-    public SysTxnState getTransMgrState() { return transactionManager.state() ; }
-    
-    /** Begin a transaction.  
-     * Terminate a write transaction with {@link Transaction#commit()} or {@link Transaction#abort()}.
-     * Terminate a write transaction with {@link Transaction#close()}.
-     */    
+
+    private void checkValid()
+    {
+        if (!isValid) 
+            throw new TDBTransactionException("StoreConnection inValid (issued before a StoreConnection.release?") ;
+    }
+
+    public Location getLocation()
+    {
+        checkValid() ;
+        return baseDSG.getLocation() ;
+    }
+
+    /**
+     * Return the associated transaction manager - do NOT use to manipulate
+     * transactions
+     */
+    public TransactionManager getTransMgr()
+    {
+        checkValid() ;
+        return transactionManager ;
+    }
+
+    /** Return a description of the transaction manager state */
+    public SysTxnState getTransMgrState()
+    {
+        checkValid() ;
+        return transactionManager.state() ;
+    }
+
+    /**
+     * Begin a transaction. Terminate a write transaction with
+     * {@link Transaction#commit()} or {@link Transaction#abort()}. Terminate a
+     * write transaction with {@link Transaction#close()}.
+     */
     public DatasetGraphTxn begin(ReadWrite mode)
     {
+        checkValid() ;
         return transactionManager.begin(mode) ;
     }
-    
-    /** Begin a transaction, giving it a label.  
-     * Terminate a write transaction with {@link Transaction#commit()} or {@link Transaction#abort()}.
+
+    /**
+     * Begin a transaction, giving it a label. Terminate a write transaction
+     * with {@link Transaction#commit()} or {@link Transaction#abort()}.
      * Terminate a write transaction with {@link Transaction#close()}.
-     */    
+     */
     public DatasetGraphTxn begin(ReadWrite mode, String label)
     {
+        checkValid() ;
         return transactionManager.begin(mode, label) ;
     }
-    
+
+    /**
+     * testing operation - do not use the base dataset without knowing how the
+     * transaction system uses it
+     */
+    public DatasetGraphTDB getBaseDataset()
+    {
+        checkValid() ;
+        return baseDSG ;
+    }
+
+    private static Map<Location, StoreConnection> cache = new HashMap<Location, StoreConnection>() ;
+
     // ---- statics managing the cache.
-    /** Obtain a StoreConenction for a particular location */  
+    /** Obtain a StoreConenction for a particular location */
     public static StoreConnection make(String location)
     {
-        return make(new Location(location)) ; 
+        return make(new Location(location)) ;
     }
 
-    /** testing operation - do not use the base dataset without knowing how the transaction system uses it */
-    public DatasetGraphTDB getBaseDataset() { return baseDSG ; }
-    
-    private static Map<Location, StoreConnection> cache = new HashMap<Location, StoreConnection>() ;
-    
-    
-    /** Stop managing all locations. */  
-    public static synchronized void reset() 
+    /** Stop managing all locations. */
+    public static synchronized void reset()
     {
         // Copy to avoid potential CME.
         Set<Location> x = new HashSet<Location>(cache.keySet()) ;
-        for ( Location loc : x )
+        for (Location loc : x)
             expel(loc, true) ;
         cache.clear() ;
     }
-    
-    /** Stop managing a location. */  
-    public static synchronized void release(Location location)    { expel(location, false) ; }
-        
-    /** Stop managing a location. */  
+
+    /** Stop managing a location. */
+    public static synchronized void release(Location location)
+    {
+        expel(location, false) ;
+    }
+
+    /** Stop managing a location. */
     private static synchronized void expel(Location location, boolean force)
     {
         StoreConnection sConn = cache.get(location) ;
-        if ( sConn == null )
-            return ;
-        if ( ! force && sConn.transactionManager.activeTransactions() )
-            throw new TDBTransactionException("Can't expel: Active transactions for location: "+location) ;
-        
-        // No transactions at this point (or we don't care and are clearing up forcefully.)
+        if (sConn == null) return ;
+        if (!force && sConn.transactionManager.activeTransactions()) throw new TDBTransactionException(
+                                                                                                       "Can't expel: Active transactions for location: "
+                                                                                                           + location) ;
+
+        // No transactions at this point (or we don't care and are clearing up
+        // forcefully.)
         sConn.transactionManager.closedown() ;
         sConn.baseDSG.close() ;
+        sConn.isValid = false ;
         cache.remove(location) ;
     }
-    
-    /** Return a StoreConnection for a particular connection.  
-     * This is used to create transactions for the database at the location.
-     */ 
+
+    /**
+     * Return a StoreConnection for a particular connection. This is used to
+     * create transactions for the database at the location.
+     */
     public static synchronized StoreConnection make(Location location)
     {
         StoreConnection sConn = cache.get(location) ;
-        if ( sConn != null )
-            return sConn ;
-        
+        if (sConn != null) return sConn ;
+
         DatasetGraphTDB dsg = DatasetBuilderStd.build(location) ;
         sConn = _makeAndCache(dsg) ;
-        return sConn ; 
+        return sConn ;
     }
-    
-    /** Return a StoreConnection for a particular connection.  
-     * This is used to create transactions for the database at the location.
-     */ 
+
+    /**
+     * Return a StoreConnection for a particular connection. This is used to
+     * create transactions for the database at the location.
+     */
     public static synchronized StoreConnection make(DatasetGraphTDB dsg)
     {
-        if ( dsg instanceof DatasetGraphTxn )
+        if (dsg instanceof DatasetGraphTxn)
         {
-            //((DatasetGraphTxn)dsg).getTransaction().getBaseDataset() ;
-            throw new TDBTransactionException("Can't make a StoreConnection from a transaction instance - need the base storage DatasetGraphTDB") ;
+            // ((DatasetGraphTxn)dsg).getTransaction().getBaseDataset() ;
+            throw new TDBTransactionException(
+                                              "Can't make a StoreConnection from a transaction instance - need the base storage DatasetGraphTDB") ;
         }
         Location location = dsg.getLocation() ;
-        
+
         StoreConnection sConn = cache.get(location) ;
-        if ( sConn == null )
-            sConn = _makeAndCache(dsg) ;
+        if (sConn == null) sConn = _makeAndCache(dsg) ;
         return sConn ;
     }
-    
-    /** Return the StoreConnection if one already exists for this location, else return null */
+
+    /**
+     * Return the StoreConnection if one already exists for this location, else
+     * return null
+     */
     public static synchronized StoreConnection getExisting(Location location)
     {
         return cache.get(location) ;
     }
-    
+
     private static StoreConnection _makeAndCache(DatasetGraphTDB dsg)
     {
         Location location = dsg.getLocation() ;
         // Just in case ... also poke the old-style cache.
         TDBMaker.releaseLocation(dsg.getLocation()) ;
         StoreConnection sConn = cache.get(location) ;
-        if ( sConn == null )
+        if (sConn == null)
         {
             sConn = new StoreConnection(dsg) ;
             JournalControl.recoverFromJournal(dsg, sConn.transactionManager.getJournal()) ;
-            if ( ! location.isMemUnique() )
-                // Don't cache use-once in-memory datasets.
-                cache.put(location, sConn) ;
+            if (!location.isMemUnique())
+            // Don't cache use-once in-memory datasets.
+            cache.put(location, sConn) ;
             String NS = TDB.PATH ;
             TransactionInfo txInfo = new TransactionInfo(sConn.getTransMgr()) ;
-            ARQMgt.register(NS+".system:type=Transactions", txInfo) ;
+            ARQMgt.register(NS + ".system:type=Transactions", txInfo) ;
         }
-        return sConn ; 
+        return sConn ;
     }
-    
-    /** Return a StoreConnection backed by in-memory datastructures (for testing).
-     */ 
+
+    /**
+     * Return a StoreConnection backed by in-memory datastructures (for
+     * testing).
+     */
     public static StoreConnection createMemUncached()
     {
         DatasetGraphTDB dsg = DatasetBuilderStd.build(Location.mem()) ;
         return new StoreConnection(dsg) ;
     }
-    
+
 }

Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java?rev=1348022&r1=1348021&r2=1348022&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java Fri Jun  8 10:53:29 2012
@@ -211,7 +211,11 @@ public class TransactionManager
                 // Could simply add txn to the commit queue and do it that way.  
                 if ( log() ) log("Commit immediately", txn) ; 
                 
-                // Right after reply?
+                // Currently, all we need is 
+                //    JournalControl.replay(txn) ;
+                // because that plays queued transactions.
+                // But for long term generallity, at the cost of one check of the journal size
+                // we do this sequence.
                 
                 processDelayedReplayQueue(txn) ;
                 enactTransaction(txn) ;

Modified: jena/trunk/jena-tdb/src/test/java/com/hp/hpl/jena/tdb/store/AbstractStoreConnections.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/test/java/com/hp/hpl/jena/tdb/store/AbstractStoreConnections.java?rev=1348022&r1=1348021&r2=1348022&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/test/java/com/hp/hpl/jena/tdb/store/AbstractStoreConnections.java (original)
+++ jena/trunk/jena-tdb/src/test/java/com/hp/hpl/jena/tdb/store/AbstractStoreConnections.java Fri Jun  8 10:53:29 2012
@@ -27,12 +27,16 @@ import org.openjena.atlas.iterator.Iter 
 import org.openjena.atlas.junit.BaseTest ;
 import org.openjena.atlas.lib.FileOps ;
 
+import com.hp.hpl.jena.query.Dataset ;
 import com.hp.hpl.jena.query.ReadWrite ;
+import com.hp.hpl.jena.rdf.model.Model ;
 import com.hp.hpl.jena.sparql.core.DatasetGraph ;
 import com.hp.hpl.jena.sparql.core.Quad ;
 import com.hp.hpl.jena.sparql.sse.SSE ;
 import com.hp.hpl.jena.tdb.ConfigTest ;
 import com.hp.hpl.jena.tdb.StoreConnection ;
+import com.hp.hpl.jena.tdb.TDB ;
+import com.hp.hpl.jena.tdb.TDBFactory ;
 import com.hp.hpl.jena.tdb.base.file.Location ;
 import com.hp.hpl.jena.tdb.sys.SystemTDB ;
 import com.hp.hpl.jena.tdb.transaction.DatasetGraphTxn ;
@@ -193,14 +197,13 @@ public abstract class AbstractStoreConne
         dsgTxn2.end() ;
     }
 
-    //@Test
-    // Does not work yet - or it fails and is detechign something -- unclear.
+    @Test
     public void store_7()
     {
         // No transaction, plain update, then transaction.
         // This tests that the dataset is sync'ed when going into transactional mode. 
         
-        boolean nonTxnData = false ;
+        boolean nonTxnData = true ;
         
         StoreConnection sConn = getStoreConnection() ;
         Location loc = sConn.getLocation() ;
@@ -208,13 +211,10 @@ public abstract class AbstractStoreConne
         if ( nonTxnData ) 
         {
             dsg.add(q) ;
+            TDB.sync(dsg) ;
             assertTrue(dsg.contains(q)) ;
         }
 
-        // DIRECT
-        // problem: this is the only quad after the TXN.
-        // Transition the problem?
-
         DatasetGraphTxn dsgTxn = sConn.begin(ReadWrite.WRITE) ;
         if ( nonTxnData ) 
             assertTrue(dsgTxn.contains(q)) ;
@@ -230,21 +230,29 @@ public abstract class AbstractStoreConne
             assertTrue(dsg.contains(q)) ;
         assertTrue(dsg.contains(q1)) ;
         
-        //StoreConnection.release(loc) ;
+        // release via the transactional machinery 
+        StoreConnection.release(loc) ;
+        sConn = null ;
         
-        sConn = StoreConnection.make(loc) ;
-        DatasetGraph dsg2 = sConn.getBaseDataset() ;
+        StoreConnection sConn2 = StoreConnection.make(loc) ;
+        DatasetGraph dsg2 = sConn2.getBaseDataset() ;
         
-        //DatasetGraph dsg2 = TDBFactory.createDatasetGraph(loc) ;
         if ( nonTxnData ) 
             assertTrue(dsg2.contains(q)) ;
         assertTrue(dsg2.contains(q1)) ;
         
-        DatasetGraphTxn dsgTxn2 = sConn.begin(ReadWrite.READ) ;
+        DatasetGraphTxn dsgTxn2 = sConn2.begin(ReadWrite.READ) ;
         if ( nonTxnData ) 
             assertTrue(dsgTxn2.contains(q)) ;
         assertTrue(dsgTxn2.contains(q1)) ;
         dsgTxn2.end() ;
+
+        // Check API methods work. 
+        Dataset ds = TDBFactory.createDataset(loc) ;
+        ds.begin(ReadWrite.READ) ;
+        Model m = (q.isDefaultGraph() ? ds.getDefaultModel() : ds.getNamedModel("g")) ; 
+        assertEquals( nonTxnData ? 2 : 1 , m.size()) ;
+        ds.end() ;
     }