You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2011/07/13 22:56:25 UTC
svn commit: r1146468 - in /incubator/jena/Experimental/TxTDB/trunk: ./
src-dev/tx/ src/main/java/com/hp/hpl/jena/tdb/
src/main/java/com/hp/hpl/jena/tdb/base/file/
src/main/java/com/hp/hpl/jena/tdb/transaction/
Author: andy
Date: Wed Jul 13 20:56:24 2011
New Revision: 1146468
URL: http://svn.apache.org/viewvc?rev=1146468&view=rev
Log: (empty)
Modified:
incubator/jena/Experimental/TxTDB/trunk/D1.ttl
incubator/jena/Experimental/TxTDB/trunk/log4j.properties
incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java
incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TestTransactions.java
incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferChannel.java
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
Modified: incubator/jena/Experimental/TxTDB/trunk/D1.ttl
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/D1.ttl?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
Binary files - no diff available.
Modified: incubator/jena/Experimental/TxTDB/trunk/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/log4j.properties?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/log4j.properties (original)
+++ incubator/jena/Experimental/TxTDB/trunk/log4j.properties Wed Jul 13 20:56:24 2011
@@ -18,6 +18,7 @@ log4j.logger.org.openjena.riot=INFO
# TDB
log4j.logger.com.hp.hpl.jena.tdb=INFO
+log4j.logger.com.hp.hpl.jena.tdb.transaction=ALL
log4j.logger.com.hp.hpl.jena.sparql.core.DatasetPrefixStorage=INFO
# Joseki server
Modified: incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java Wed Jul 13 20:56:24 2011
@@ -6,7 +6,14 @@ public class DevTx
// --------
// To do for release 0:
// Documentation
+ // StoreConnection.Location.mem ==> directory called "--mem--" and chaos.
+ // Partial reply : base reader exits - replay until blocking higher reader.
+ // Node writing happens during prepare() regardless of blockers and locks. Opps.
+ // Abort() etc truncates Journal to 0 - but that might loose a pending transaction. Opps?? Happens?
+ // Some kind of call after commit for clear up.
+ // Call when really commited.
// --------
+ // CRC and bullet-proof read of Journal.
// Tests
Modified: incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TestTransactions.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TestTransactions.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TestTransactions.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TestTransactions.java Wed Jul 13 20:56:24 2011
@@ -18,8 +18,75 @@
package tx;
-public class TestTransactions
+import org.junit.After ;
+import org.junit.Before ;
+import org.junit.Test ;
+import org.openjena.atlas.junit.BaseTest ;
+import org.openjena.atlas.lib.FileOps ;
+
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.graph.Triple ;
+import com.hp.hpl.jena.sparql.core.Quad ;
+import com.hp.hpl.jena.sparql.util.NodeFactory ;
+import com.hp.hpl.jena.tdb.ConfigTest ;
+import com.hp.hpl.jena.tdb.DatasetGraphTxn ;
+import com.hp.hpl.jena.tdb.ReadWrite ;
+import com.hp.hpl.jena.tdb.StoreConnection ;
+import com.hp.hpl.jena.tdb.base.file.Location ;
+
+public class TestTransactions extends BaseTest
{
+ static Node s = NodeFactory.parseNode("<s>") ;
+ static Node p = NodeFactory.parseNode("<p>") ;
+ static Node o = NodeFactory.parseNode("<o>") ;
+ static Node g = NodeFactory.parseNode("<g>") ;
+ static Triple t = new Triple(s,p,o) ;
+ static Quad q = new Quad(g,s,p,o) ;
+
+ static final String DIR = ConfigTest.getTestingDirDB() ;
+ static final Location LOC = new Location(DIR) ;
+
+ @Before public void setup()
+ {
+ FileOps.clearDirectory(DIR) ;
+ StoreConnection.reset() ;
+ StoreConnection sConn = StoreConnection.make(LOC) ;
+ }
+
+ @After public void teardown() {}
+
+ @Test public void trans_01()
+ {
+ StoreConnection sConn = StoreConnection.make(LOC) ;
+ DatasetGraphTxn dsg = sConn.begin(ReadWrite.READ) ;
+ dsg.close() ;
+ }
+
+ @Test public void trans_02()
+ {
+ StoreConnection sConn = StoreConnection.make(LOC) ;
+ DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ;
+ try {
+ dsg.add(q) ;
+ assertTrue(dsg.contains(q)) ;
+ dsg.commit() ;
+ } finally { dsg.close() ; }
+ }
+
+ @Test public void trans_03()
+ {
+ StoreConnection sConn = StoreConnection.make(LOC) ;
+ DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ;
+
+ dsg.add(q) ;
+ assertTrue(dsg.contains(q)) ;
+ dsg.commit() ;
+ dsg.close() ;
+
+ DatasetGraphTxn dsg2 = sConn.begin(ReadWrite.READ) ;
+ assertTrue(dsg2.contains(q)) ;
+ dsg2.close() ;
+ }
}
Modified: incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java Wed Jul 13 20:56:24 2011
@@ -9,6 +9,7 @@ package tx;
import org.openjena.atlas.lib.Bytes ;
import org.openjena.atlas.lib.FileOps ;
import org.openjena.atlas.logging.Log ;
+import org.openjena.riot.RiotWriter ;
import com.hp.hpl.jena.graph.Graph ;
import com.hp.hpl.jena.query.DatasetFactory ;
@@ -20,6 +21,7 @@ import com.hp.hpl.jena.query.Syntax ;
import com.hp.hpl.jena.rdf.model.Model ;
import com.hp.hpl.jena.rdf.model.ModelFactory ;
import com.hp.hpl.jena.sparql.core.DatasetGraph ;
+import com.hp.hpl.jena.sparql.sse.SSE ;
import com.hp.hpl.jena.sparql.util.QueryExecUtils ;
import com.hp.hpl.jena.tdb.DatasetGraphTxn ;
import com.hp.hpl.jena.tdb.ReadWrite ;
@@ -58,34 +60,83 @@ public class TxMain
initFS() ;
StoreConnection sConn = StoreConnection.make(DBdir) ;
+ // ---- Simple
+ if ( false )
+ {
+ DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ;
+ //load("D.ttl", dsg) ; // Loads 3 triples
+ dsg.getDefaultGraph().add(SSE.parseTriple("(<s> <p> <o>)")) ;
+ dsg.add(SSE.parseQuad("(<g> <s> <p> 123)")) ;
+ dsg.add(SSE.parseQuad("(_ <s> <p> 123)")) ;
+
+ dsg.commit() ;
+ dsg.close() ;
+ DatasetGraphTxn dsgRead = sConn.begin(ReadWrite.READ) ;
+ dump(dsgRead) ;
+ query("SELECT ?g (count(*) AS ?C) { { ?s ?p ?o } UNION { GRAPH ?g { ?s ?p ?o } } } GROUP BY ?g", dsgRead) ;
+ dsgRead.close() ;
+ exit(0) ;
+ }
+
+ if ( false )
+ {
+ // Blocking transaction
+ DatasetGraphTxn dsgRead = sConn.begin(ReadWrite.READ) ;
+
+
+ DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ; // Optional label.
+ //load("D.ttl", dsg) ; // Loads 3 triples
+ dsg.getDefaultGraph().add(SSE.parseTriple("(<s> <p> <o>)")) ;
+ dsg.add(SSE.parseQuad("(<g> <s> <p> 123)")) ;
+ dsg.add(SSE.parseQuad("(_ <s> <p> 123)")) ;
+ dsg.commit() ;
+ dsg.close() ;
+
+ dump(dsgRead) ;
+ query("SELECT ?g (count(*) AS ?C) { { ?s ?p ?o } UNION { GRAPH ?g { ?s ?p ?o } } } GROUP BY ?g", dsgRead) ;
+ dsgRead.close() ;
+ DatasetGraphTxn dsgRead2 = sConn.begin(ReadWrite.READ) ;
+ query("SELECT ?g (count(*) AS ?C) { { ?s ?p ?o } UNION { GRAPH ?g { ?s ?p ?o } } } GROUP BY ?g", dsgRead2) ;
+ dsgRead2.close() ;
+ exit(0) ;
+ }
+
+ // BUG somewhere.
+ // Check DevTx list of things to do.
// Take a blocking read connection.
- DatasetGraphTxn dsgRead = sConn.begin(ReadWrite.READ) ; // Optional label.
+ DatasetGraphTxn dsgRead = sConn.begin(ReadWrite.READ) ; //dsgRead.close() ;
DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ;
load("D.ttl", dsg) ; // Loads 3 triples
+ query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsgRead) ;
query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsg) ;
-
dsg.commit() ;
dsg.close() ;
+ dsg = null ;
+ // Reader after update.
+ // First reader still reading.
- // Reader still open.
- // At this point, there is a blocking
-
- dsg = sConn.begin(ReadWrite.WRITE) ;
- load("D1.ttl", dsg) ; // Loads 1 triples
-
- query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsg) ;
+ //DatasetGraphTxn dsgRead2 = sConn.begin(ReadWrite.READ) ;
+ //query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsgRead2) ;
query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsgRead) ;
-
-
- dsg.commit() ;
- dsg.close() ;
- DatasetGraphTxn dsgRead2 = sConn.begin(ReadWrite.READ) ;
- query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsgRead2) ;
- dsgRead2.close() ;
-
+ // A writer.
+ DatasetGraphTxn dsg2 = sConn.begin(ReadWrite.WRITE) ;
+ load("D1.ttl", dsg2) ; // Loads 1 triples
+ query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsg2) ;
+ //query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsgRead2) ;
+ query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsgRead) ;
+ dsg2.commit() ;
+ dsg2.close() ;
+ dsg2 = null ;
+
+ //dsgRead2.close() ;
+
+// DatasetGraphTxn dsgRead2 = sConn.begin(ReadWrite.READ) ;
+// query("SELECT (count(*) AS ?C) { ?s ?p ?o }", dsgRead2) ;
+// dsgRead2.close() ;
+
dsgRead.close() ; // Transaction can now write changes to the real DB.
// ILLEGAL!!!!
@@ -117,6 +168,11 @@ public class TxMain
return dsg ;
}
+ public static void dump(DatasetGraphTxn dsg)
+ {
+ RiotWriter.writeNQuads(System.out, dsg) ;
+ }
+
public static void query(String queryStr, DatasetGraphTxn dsg)
{
String x = "Query ("+dsg.getTransaction().getLabel()+")" ;
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java Wed Jul 13 20:56:24 2011
@@ -64,6 +64,14 @@ public class StoreConnection
private static Map<Location, StoreConnection> cache = new HashMap<Location, StoreConnection>() ;
+ public static void reset()
+ {
+ for ( Map.Entry<Location, StoreConnection> e : cache.entrySet() )
+ e.getValue().baseDSG.close() ;
+
+ cache.clear() ;
+ }
+
public static StoreConnection make(Location location)
{
TDBMaker.releaseLocation(location) ;
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferChannel.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferChannel.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferChannel.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferChannel.java Wed Jul 13 20:56:24 2011
@@ -45,7 +45,7 @@ public interface BufferChannel extends S
/** set the position */
public void position(long pos) ;
- /** Read into a ByteBuffer. Returns the number of bytes read.
+ /** Read into a ByteBuffer. Returns the number of bytes read. -1 for end of file.
*/
public int read(ByteBuffer buffer) ;
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java Wed Jul 13 20:56:24 2011
@@ -135,7 +135,13 @@ class Journal implements Iterable<Journa
// UGLY Maybe better to leave some space in the block's byte buffer.
// [TxTDB:TODO] Make robust against partial read.
header.clear() ;
- channel.read(header) ;
+ int lenRead = channel.read(header) ;
+ if ( lenRead == -1 )
+ {
+ // probably broken file.
+ throw new TDBTransactionException("Read off the end of a journal file") ;
+ //return null ;
+ }
header.rewind() ;
int typeId = header.getInt() ;
int len = header.getInt() ;
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java Wed Jul 13 20:56:24 2011
@@ -134,7 +134,16 @@ public class JournalControl
FileOps.delete(objFilename) ;
}
- public static void replay(Journal journal, DatasetGraphTDB dsg)
+ public static void replay(Transaction transaction)
+ {
+ Journal journal = transaction.getJournal() ;
+ DatasetGraphTDB dsg = transaction.getBaseDataset() ;
+ replay(journal, dsg) ;
+// Iterator<Transactional> iter = transaction.components() ;
+// xxxxxxxxxxxxxxxx
+ }
+
+ private static void replay(Journal journal, DatasetGraphTDB dsg)
{
journal.position(0) ;
dsg.getLock().enterCriticalSection(Lock.WRITE) ;
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java Wed Jul 13 20:56:24 2011
@@ -43,6 +43,12 @@ public class Transaction
if (label == null )
label = "Txn" ;
label = label+"["+id+"]" ;
+ switch(mode)
+ {
+ case READ : label = label+"/R" ; break ;
+ case WRITE : label = label+"/W" ; break ;
+ }
+
this.label = label ;
this.txnMgr = txnMgr ;
this.basedsg = dsg ;
@@ -61,7 +67,7 @@ public class Transaction
{
if ( state != TxnState.ACTIVE )
throw new TDBTransactionException("Transaction has already committed or aborted") ;
-
+ prepare() ;
JournalEntry entry = new JournalEntry(JournalEntryType.Commit, FileRef.Journal, null) ;
journal.writeJournal(entry) ;
journal.sync() ; // Commit point.
@@ -73,13 +79,7 @@ public class Transaction
private void prepare()
{
- if ( mode == ReadWrite.READ )
- return ;
-
- if ( state != TxnState.ACTIVE )
- throw new TDBTransactionException("Transaction has already committed or aborted") ;
state = TxnState.PREPARING ;
-
for ( BlockMgrJournal x : blkMgrs )
x.commitPrepare(this) ;
for ( NodeTableTrans x : nodeTableTrans )
@@ -110,6 +110,7 @@ public class Transaction
for ( NodeTableTrans x : nodeTableTrans )
x.abort(this) ;
+ // [TxTDB:TODO] : No - truncates a pending transaction.
journal.truncate(0) ;
state = TxnState.ABORTED ;
@@ -125,6 +126,7 @@ public class Transaction
{
switch(state)
{
+ case CLOSED: return ; // Can call close() repeatedly.
case ACTIVE:
if ( mode == ReadWrite.READ )
commit() ;
@@ -151,18 +153,8 @@ public class Transaction
}
public ReadWrite getMode() { return mode ; }
- public TxnState getState() { return state ; }
+ public TxnState getState() { return state ; }
- List<NodeTableTrans> getNodeTableTrans()
- {
- return nodeTableTrans ;
- }
-
- List<BlockMgrJournal> getBlkMgrs()
- {
- return blkMgrs ;
- }
-
public long getTxnId() { return id ; }
TransactionManager getTxnMgr() { return txnMgr ; }
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java?rev=1146468&r1=1146467&r2=1146468&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java Wed Jul 13 20:56:24 2011
@@ -7,6 +7,7 @@
package com.hp.hpl.jena.tdb.transaction;
import static com.hp.hpl.jena.tdb.ReadWrite.READ ;
+import static com.hp.hpl.jena.tdb.sys.SystemTDB.syslog ;
import static java.lang.String.format ;
import java.util.ArrayList ;
@@ -19,6 +20,7 @@ import java.util.concurrent.LinkedBlocki
import org.openjena.atlas.logging.Log ;
import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
import com.hp.hpl.jena.tdb.DatasetGraphTxn ;
import com.hp.hpl.jena.tdb.ReadWrite ;
@@ -30,7 +32,7 @@ public class TransactionManager
// TODO Don't keep counter, keep lists.
// TODO Useful logging.
- private static Logger log = SystemTDB.syslog ;
+ private static Logger log = LoggerFactory.getLogger(TransactionManager.class) ;
private Set<Transaction> activeTransactions = new HashSet<Transaction>() ;
@@ -105,7 +107,6 @@ public class TransactionManager
if ( ! commitedAwaitingFlush.isEmpty() )
dsg = commitedAwaitingFlush.get(commitedAwaitingFlush.size()-1).getActiveDataset() ;
-
Transaction txn = createTransaction(dsg, mode, label) ;
DatasetGraphTxn dsgTxn = (DatasetGraphTxn)new DatasetBuilderTxn(this).build(txn, mode, dsg) ;
txn.setActiveDataset(dsgTxn) ;
@@ -116,16 +117,14 @@ public class TransactionManager
iter.next().begin(dsgTxn.getTransaction()) ;
activeTransactions.add(txn) ;
- if ( log.isDebugEnabled() )
- log.debug("begin: "+txn) ;
+ log("begin",txn) ;
return dsgTxn ;
}
synchronized
public void notifyCommit(Transaction transaction)
{
- if ( log.isDebugEnabled() )
- log.debug("commit: "+transaction) ;
+ log("commit", transaction) ;
// Transaction has done the commitPrepare - can we enact it?
@@ -134,44 +133,44 @@ public class TransactionManager
endTransaction(transaction) ;
- if ( readers == 0 && transaction.getMode() == ReadWrite.WRITE )
- // New readers blocked from starting by the synchronized here and on begin.
- JournalControl.replay(transaction.getJournal(), transaction.getBaseDataset()) ;
- else
+ if ( transaction.getMode() == ReadWrite.WRITE )
{
- commitedAwaitingFlush.add(transaction) ;
- if ( log.isDebugEnabled() )
- log.info("Commit blocked at the moment") ;
- queue.add(transaction) ;
+ if ( readers == 0 )
+ // Can commit imemdiately.
+ commitTransaction(transaction) ;
+ else
+ {
+ // Can't make permentent at the moment.
+ commitedAwaitingFlush.add(transaction) ;
+ log.debug("Commit pending: "+transaction.getLabel());
+
+ //if ( log.isDebugEnabled() )
+ // log.debug("Commit blocked at the moment") ;
+ queue.add(transaction) ;
+ }
}
}
private void commitTransaction(Transaction transaction)
{
// Really, really do it!
- for ( BlockMgrJournal x : transaction.getBlkMgrs() )
- {
- x.commitEnact(transaction) ;
- x.clearup(transaction) ;
- }
-
- for ( NodeTableTrans x : transaction.getNodeTableTrans() )
+ Iterator<Transactional> iter = transaction.components() ;
+ for ( ; iter.hasNext() ; )
{
+ Transactional x = iter.next() ;
x.commitEnact(transaction) ;
x.clearup(transaction) ;
}
-
// This cleans up as well.
- JournalControl.replay(transaction.getJournal(), transaction.getBaseDataset()) ;
+ JournalControl.replay(transaction) ;
}
synchronized
public void notifyAbort(Transaction transaction)
- {
+ {
+ log("abort", transaction) ;
// Transaction has done the abort on all the transactional elements.
// TODO Suppose the system journal has
- if ( log.isDebugEnabled() )
- log.info("abort: "+transaction) ;
if ( ! activeTransactions.contains(transaction) )
SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ;
endTransaction(transaction) ;
@@ -180,13 +179,12 @@ public class TransactionManager
synchronized
public void notifyClose(Transaction txn)
{
- if ( log.isDebugEnabled() )
- log.debug("notifyClose: "+txn) ;
+ log("close", txn) ;
if ( txn.getState() == TxnState.ACTIVE )
{
String x = txn.getBaseDataset().getLocation().getDirectoryPath() ;
- SystemTDB.syslog.warn("close: Transaction not commited or aborted: Transaction: "+txn.getTxnId()+" @ "+x) ;
+ syslog.warn("close: Transaction not commited or aborted: Transaction: "+txn.getTxnId()+" @ "+x) ;
txn.abort() ;
}
@@ -202,10 +200,9 @@ public class TransactionManager
Transaction txn2 = queue.take() ;
if ( txn2.getMode() == READ )
continue ;
- log.info("Delayed commit") ;
+ log("Delayed commit", txn2) ;
// This takes a Write lock on the DSG - this is where it blocks.
- JournalControl.replay(txn2.getJournal(), txn2.getBaseDataset()) ;
- log.info("Delayed commit succeeded") ;
+ JournalControl.replay(txn2) ;
commitedAwaitingFlush.remove(txn) ;
} catch (InterruptedException ex)
{ Log.fatal(this, "Interruped!", ex) ; }
@@ -213,8 +210,7 @@ public class TransactionManager
}
else
{
- if ( log.isDebugEnabled() )
- log.debug(format("Pending transactions: R=%d / W=%d", readers, writers)) ;
+ if ( log() ) log(format("Pending transactions: R=%d / W=%d", readers, writers), txn) ;
}
}
@@ -227,6 +223,21 @@ public class TransactionManager
activeTransactions.remove(transaction) ;
}
+
+ private boolean log()
+ {
+ return syslog.isDebugEnabled() || log.isDebugEnabled() ;
+ }
+
+ private void log(String msg, Transaction txn)
+ {
+ if ( ! log() )
+ return ;
+ if ( syslog.isDebugEnabled() )
+ syslog.debug(txn.getLabel()+": "+msg) ;
+ else
+ log.debug(txn.getLabel()+": "+msg) ;
+ }
// LATER.
class Committer implements Runnable
@@ -244,7 +255,7 @@ public class TransactionManager
Transaction txn = queue.take() ;
System.out.println("Async commit") ;
// This takes a Write lock on the DSG - this is where it blocks.
- JournalControl.replay(txn.getJournal(), txn.getBaseDataset()) ;
+ JournalControl.replay(txn) ;
System.out.println("Async commit succeeded") ;
synchronized(TransactionManager.this)
{