You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2012/06/08 12:53:29 UTC
svn commit: r1348022 - in /jena/trunk/jena-tdb/src:
main/java/com/hp/hpl/jena/tdb/StoreConnection.java
main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
test/java/com/hp/hpl/jena/tdb/store/AbstractStoreConnections.java
Author: andy
Date: Fri Jun 8 10:53:29 2012
New Revision: 1348022
URL: http://svn.apache.org/viewvc?rev=1348022&view=rev
Log:
JENA-250
Sync on transitions to tarsnactional mode.
Modified:
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
jena/trunk/jena-tdb/src/test/java/com/hp/hpl/jena/tdb/store/AbstractStoreConnections.java
Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java?rev=1348022&r1=1348021&r2=1348022&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java Fri Jun 8 10:53:29 2012
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package com.hp.hpl.jena.tdb;
+package com.hp.hpl.jena.tdb ;
import java.util.HashMap ;
import java.util.HashSet ;
@@ -31,159 +31,203 @@ import com.hp.hpl.jena.tdb.store.Dataset
import com.hp.hpl.jena.tdb.sys.TDBMaker ;
import com.hp.hpl.jena.tdb.transaction.* ;
-
-/** Interface to the TDB transaction mechanism. */
+/** Interface to the TDB transaction mechanism. */
public class StoreConnection
{
// A StoreConnection is the reference to the underlying storage.
- // There is cache of backing datasets, managed by statics in StoreConnection.
+ // There is cache of backing datasets, managed by statics in
+ // StoreConnection.
// The work of transaction coordination is done in TransactionManager.
-
+
private final TransactionManager transactionManager ;
- private final DatasetGraphTDB baseDSG ;
+ private final DatasetGraphTDB baseDSG ;
+ private boolean isValid = true ;
-// private StoreConnection(Location location)
-// {
-// baseDSG = DatasetBuilderStd.build(location) ;
-// transactionManager = new TransactionManager(baseDSG) ;
-// }
-//
+ // private StoreConnection(Location location)
+ // {
+ // baseDSG = DatasetBuilderStd.build(location) ;
+ // transactionManager = new TransactionManager(baseDSG) ;
+ // }
+ //
private StoreConnection(DatasetGraphTDB dsg)
{
baseDSG = dsg ;
transactionManager = new TransactionManager(baseDSG) ;
}
-
- public Location getLocation() { return baseDSG.getLocation() ; }
- /** Return the associated transaction manager - do NOT use to manipulate transactions */
- public TransactionManager getTransMgr() { return transactionManager ; }
-
- /** Return a description of the transaction manager state */
- public SysTxnState getTransMgrState() { return transactionManager.state() ; }
-
- /** Begin a transaction.
- * Terminate a write transaction with {@link Transaction#commit()} or {@link Transaction#abort()}.
- * Terminate a write transaction with {@link Transaction#close()}.
- */
+
+ private void checkValid()
+ {
+ if (!isValid)
+ throw new TDBTransactionException("StoreConnection inValid (issued before a StoreConnection.release?") ;
+ }
+
+ public Location getLocation()
+ {
+ checkValid() ;
+ return baseDSG.getLocation() ;
+ }
+
+ /**
+ * Return the associated transaction manager - do NOT use to manipulate
+ * transactions
+ */
+ public TransactionManager getTransMgr()
+ {
+ checkValid() ;
+ return transactionManager ;
+ }
+
+ /** Return a description of the transaction manager state */
+ public SysTxnState getTransMgrState()
+ {
+ checkValid() ;
+ return transactionManager.state() ;
+ }
+
+ /**
+ * Begin a transaction. Terminate a write transaction with
+ * {@link Transaction#commit()} or {@link Transaction#abort()}. Terminate a
+ * write transaction with {@link Transaction#close()}.
+ */
public DatasetGraphTxn begin(ReadWrite mode)
{
+ checkValid() ;
return transactionManager.begin(mode) ;
}
-
- /** Begin a transaction, giving it a label.
- * Terminate a write transaction with {@link Transaction#commit()} or {@link Transaction#abort()}.
+
+ /**
+ * Begin a transaction, giving it a label. Terminate a write transaction
+ * with {@link Transaction#commit()} or {@link Transaction#abort()}.
* Terminate a write transaction with {@link Transaction#close()}.
- */
+ */
public DatasetGraphTxn begin(ReadWrite mode, String label)
{
+ checkValid() ;
return transactionManager.begin(mode, label) ;
}
-
+
+ /**
+ * testing operation - do not use the base dataset without knowing how the
+ * transaction system uses it
+ */
+ public DatasetGraphTDB getBaseDataset()
+ {
+ checkValid() ;
+ return baseDSG ;
+ }
+
+ private static Map<Location, StoreConnection> cache = new HashMap<Location, StoreConnection>() ;
+
// ---- statics managing the cache.
- /** Obtain a StoreConenction for a particular location */
+ /** Obtain a StoreConenction for a particular location */
public static StoreConnection make(String location)
{
- return make(new Location(location)) ;
+ return make(new Location(location)) ;
}
- /** testing operation - do not use the base dataset without knowing how the transaction system uses it */
- public DatasetGraphTDB getBaseDataset() { return baseDSG ; }
-
- private static Map<Location, StoreConnection> cache = new HashMap<Location, StoreConnection>() ;
-
-
- /** Stop managing all locations. */
- public static synchronized void reset()
+ /** Stop managing all locations. */
+ public static synchronized void reset()
{
// Copy to avoid potential CME.
Set<Location> x = new HashSet<Location>(cache.keySet()) ;
- for ( Location loc : x )
+ for (Location loc : x)
expel(loc, true) ;
cache.clear() ;
}
-
- /** Stop managing a location. */
- public static synchronized void release(Location location) { expel(location, false) ; }
-
- /** Stop managing a location. */
+
+ /** Stop managing a location. */
+ public static synchronized void release(Location location)
+ {
+ expel(location, false) ;
+ }
+
+ /** Stop managing a location. */
private static synchronized void expel(Location location, boolean force)
{
StoreConnection sConn = cache.get(location) ;
- if ( sConn == null )
- return ;
- if ( ! force && sConn.transactionManager.activeTransactions() )
- throw new TDBTransactionException("Can't expel: Active transactions for location: "+location) ;
-
- // No transactions at this point (or we don't care and are clearing up forcefully.)
+ if (sConn == null) return ;
+ if (!force && sConn.transactionManager.activeTransactions()) throw new TDBTransactionException(
+ "Can't expel: Active transactions for location: "
+ + location) ;
+
+ // No transactions at this point (or we don't care and are clearing up
+ // forcefully.)
sConn.transactionManager.closedown() ;
sConn.baseDSG.close() ;
+ sConn.isValid = false ;
cache.remove(location) ;
}
-
- /** Return a StoreConnection for a particular connection.
- * This is used to create transactions for the database at the location.
- */
+
+ /**
+ * Return a StoreConnection for a particular connection. This is used to
+ * create transactions for the database at the location.
+ */
public static synchronized StoreConnection make(Location location)
{
StoreConnection sConn = cache.get(location) ;
- if ( sConn != null )
- return sConn ;
-
+ if (sConn != null) return sConn ;
+
DatasetGraphTDB dsg = DatasetBuilderStd.build(location) ;
sConn = _makeAndCache(dsg) ;
- return sConn ;
+ return sConn ;
}
-
- /** Return a StoreConnection for a particular connection.
- * This is used to create transactions for the database at the location.
- */
+
+ /**
+ * Return a StoreConnection for a particular connection. This is used to
+ * create transactions for the database at the location.
+ */
public static synchronized StoreConnection make(DatasetGraphTDB dsg)
{
- if ( dsg instanceof DatasetGraphTxn )
+ if (dsg instanceof DatasetGraphTxn)
{
- //((DatasetGraphTxn)dsg).getTransaction().getBaseDataset() ;
- throw new TDBTransactionException("Can't make a StoreConnection from a transaction instance - need the base storage DatasetGraphTDB") ;
+ // ((DatasetGraphTxn)dsg).getTransaction().getBaseDataset() ;
+ throw new TDBTransactionException(
+ "Can't make a StoreConnection from a transaction instance - need the base storage DatasetGraphTDB") ;
}
Location location = dsg.getLocation() ;
-
+
StoreConnection sConn = cache.get(location) ;
- if ( sConn == null )
- sConn = _makeAndCache(dsg) ;
+ if (sConn == null) sConn = _makeAndCache(dsg) ;
return sConn ;
}
-
- /** Return the StoreConnection if one already exists for this location, else return null */
+
+ /**
+ * Return the StoreConnection if one already exists for this location, else
+ * return null
+ */
public static synchronized StoreConnection getExisting(Location location)
{
return cache.get(location) ;
}
-
+
private static StoreConnection _makeAndCache(DatasetGraphTDB dsg)
{
Location location = dsg.getLocation() ;
// Just in case ... also poke the old-style cache.
TDBMaker.releaseLocation(dsg.getLocation()) ;
StoreConnection sConn = cache.get(location) ;
- if ( sConn == null )
+ if (sConn == null)
{
sConn = new StoreConnection(dsg) ;
JournalControl.recoverFromJournal(dsg, sConn.transactionManager.getJournal()) ;
- if ( ! location.isMemUnique() )
- // Don't cache use-once in-memory datasets.
- cache.put(location, sConn) ;
+ if (!location.isMemUnique())
+ // Don't cache use-once in-memory datasets.
+ cache.put(location, sConn) ;
String NS = TDB.PATH ;
TransactionInfo txInfo = new TransactionInfo(sConn.getTransMgr()) ;
- ARQMgt.register(NS+".system:type=Transactions", txInfo) ;
+ ARQMgt.register(NS + ".system:type=Transactions", txInfo) ;
}
- return sConn ;
+ return sConn ;
}
-
- /** Return a StoreConnection backed by in-memory datastructures (for testing).
- */
+
+ /**
+ * Return a StoreConnection backed by in-memory datastructures (for
+ * testing).
+ */
public static StoreConnection createMemUncached()
{
DatasetGraphTDB dsg = DatasetBuilderStd.build(Location.mem()) ;
return new StoreConnection(dsg) ;
}
-
+
}
Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java?rev=1348022&r1=1348021&r2=1348022&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java Fri Jun 8 10:53:29 2012
@@ -211,7 +211,11 @@ public class TransactionManager
// Could simply add txn to the commit queue and do it that way.
if ( log() ) log("Commit immediately", txn) ;
- // Right after reply?
+ // Currently, all we need is
+ // JournalControl.replay(txn) ;
+ // because that plays queued transactions.
+ // But for long term generallity, at the cost of one check of the journal size
+ // we do this sequence.
processDelayedReplayQueue(txn) ;
enactTransaction(txn) ;
Modified: jena/trunk/jena-tdb/src/test/java/com/hp/hpl/jena/tdb/store/AbstractStoreConnections.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/test/java/com/hp/hpl/jena/tdb/store/AbstractStoreConnections.java?rev=1348022&r1=1348021&r2=1348022&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/test/java/com/hp/hpl/jena/tdb/store/AbstractStoreConnections.java (original)
+++ jena/trunk/jena-tdb/src/test/java/com/hp/hpl/jena/tdb/store/AbstractStoreConnections.java Fri Jun 8 10:53:29 2012
@@ -27,12 +27,16 @@ import org.openjena.atlas.iterator.Iter
import org.openjena.atlas.junit.BaseTest ;
import org.openjena.atlas.lib.FileOps ;
+import com.hp.hpl.jena.query.Dataset ;
import com.hp.hpl.jena.query.ReadWrite ;
+import com.hp.hpl.jena.rdf.model.Model ;
import com.hp.hpl.jena.sparql.core.DatasetGraph ;
import com.hp.hpl.jena.sparql.core.Quad ;
import com.hp.hpl.jena.sparql.sse.SSE ;
import com.hp.hpl.jena.tdb.ConfigTest ;
import com.hp.hpl.jena.tdb.StoreConnection ;
+import com.hp.hpl.jena.tdb.TDB ;
+import com.hp.hpl.jena.tdb.TDBFactory ;
import com.hp.hpl.jena.tdb.base.file.Location ;
import com.hp.hpl.jena.tdb.sys.SystemTDB ;
import com.hp.hpl.jena.tdb.transaction.DatasetGraphTxn ;
@@ -193,14 +197,13 @@ public abstract class AbstractStoreConne
dsgTxn2.end() ;
}
- //@Test
- // Does not work yet - or it fails and is detechign something -- unclear.
+ @Test
public void store_7()
{
// No transaction, plain update, then transaction.
// This tests that the dataset is sync'ed when going into transactional mode.
- boolean nonTxnData = false ;
+ boolean nonTxnData = true ;
StoreConnection sConn = getStoreConnection() ;
Location loc = sConn.getLocation() ;
@@ -208,13 +211,10 @@ public abstract class AbstractStoreConne
if ( nonTxnData )
{
dsg.add(q) ;
+ TDB.sync(dsg) ;
assertTrue(dsg.contains(q)) ;
}
- // DIRECT
- // problem: this is the only quad after the TXN.
- // Transition the problem?
-
DatasetGraphTxn dsgTxn = sConn.begin(ReadWrite.WRITE) ;
if ( nonTxnData )
assertTrue(dsgTxn.contains(q)) ;
@@ -230,21 +230,29 @@ public abstract class AbstractStoreConne
assertTrue(dsg.contains(q)) ;
assertTrue(dsg.contains(q1)) ;
- //StoreConnection.release(loc) ;
+ // release via the transactional machinery
+ StoreConnection.release(loc) ;
+ sConn = null ;
- sConn = StoreConnection.make(loc) ;
- DatasetGraph dsg2 = sConn.getBaseDataset() ;
+ StoreConnection sConn2 = StoreConnection.make(loc) ;
+ DatasetGraph dsg2 = sConn2.getBaseDataset() ;
- //DatasetGraph dsg2 = TDBFactory.createDatasetGraph(loc) ;
if ( nonTxnData )
assertTrue(dsg2.contains(q)) ;
assertTrue(dsg2.contains(q1)) ;
- DatasetGraphTxn dsgTxn2 = sConn.begin(ReadWrite.READ) ;
+ DatasetGraphTxn dsgTxn2 = sConn2.begin(ReadWrite.READ) ;
if ( nonTxnData )
assertTrue(dsgTxn2.contains(q)) ;
assertTrue(dsgTxn2.contains(q1)) ;
dsgTxn2.end() ;
+
+ // Check API methods work.
+ Dataset ds = TDBFactory.createDataset(loc) ;
+ ds.begin(ReadWrite.READ) ;
+ Model m = (q.isDefaultGraph() ? ds.getDefaultModel() : ds.getNamedModel("g")) ;
+ assertEquals( nonTxnData ? 2 : 1 , m.size()) ;
+ ds.end() ;
}