You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2011/08/16 20:43:38 UTC
svn commit: r1158418 - in /incubator/jena/Experimental/TxTDB/trunk/src:
main/java/com/hp/hpl/jena/tdb/ main/java/com/hp/hpl/jena/tdb/transaction/
test/java/com/hp/hpl/jena/tdb/transaction/
Author: andy
Date: Tue Aug 16 18:43:37 2011
New Revision: 1158418
URL: http://svn.apache.org/viewvc?rev=1158418&view=rev
Log:
Abort didn't clearup disk files correctly.
Modified:
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/NodeTableTrans.java
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java
incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java?rev=1158418&r1=1158417&r2=1158418&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java Tue Aug 16 18:43:37 2011
@@ -107,6 +107,9 @@ public class StoreConnection
return ;
if ( ! force && sConn.transactionManager.activeTransactions() )
throw new TDBTransactionException("Can't expel: Active transactions for location: "+location) ;
+
+ // No transactions at this point (or we don't care and are clearing up forcefully.)
+ sConn.transactionManager.closedown() ;
sConn.baseDSG.close() ;
cache.remove(location) ;
}
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/NodeTableTrans.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/NodeTableTrans.java?rev=1158418&r1=1158417&r2=1158418&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/NodeTableTrans.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/NodeTableTrans.java Tue Aug 16 18:43:37 2011
@@ -47,6 +47,9 @@ public class NodeTableTrans implements N
private final Index nodeIndex ;
private ObjectFile journal ;
+ // Start of the journal file for this transaction.
+ // Always zero currently but allows for future
+ private long journalStartOffset ;
private final String label ;
public NodeTableTrans(String label, NodeTable sub, Index nodeIndex, ObjectFile journal)
@@ -123,10 +126,9 @@ public class NodeTableTrans implements N
offset = base.allocOffset().getId() ;
// Any outstanding transactions
- int offset2 = (int)journal.length() ;
+ long journalOffset = journal.length() ;
debug("begin: %s", label) ;
- offset += offset2 ;
-
+ offset += journalOffset ;
this.nodeTableJournal = new NodeTableNative(nodeIndex, journal) ;
this.nodeTableJournal = NodeTableCache.create(nodeTableJournal, CacheSize, CacheSize) ;
// Do not add the inline NodeTable here - don't convert it's values by the offset!
@@ -144,7 +146,7 @@ public class NodeTableTrans implements N
Node node = x.getRight() ;
NodeId nodeId2 = base.getAllocateNodeId(node) ;
if ( ! nodeId2.equals(mapFromJournal(nodeId)) )
- throw new TDBException(String.format("Different ids allocated: expected %s, got %s\n", nodeId, nodeId2)) ;
+ throw new TDBException(String.format("Different ids allocated: expected %s, got %s\n", mapFromJournal(nodeId), nodeId2)) ;
}
}
@@ -162,16 +164,18 @@ public class NodeTableTrans implements N
@Override
public void commitEnact(Transaction txn)
{
+ // The work was done in commitPrepare, using the fact that node data file
+ // is append only. Until pointers to the extra data aren't available
+ // until the index is written.
debug("commitEnact: %s", label) ;
//writeJournal() ;
-
}
private void writeNodeJournal()
{
append() ;
nodeIndex.clear() ;
- journal.truncate(0) ;
+ journal.truncate(journalStartOffset) ;
journal.sync() ;
base.sync() ;
offset = base.allocOffset().getId() ;
@@ -190,6 +194,10 @@ public class NodeTableTrans implements N
{
if ( nodeTableJournal == null )
throw new TDBTransactionException("Not in a transaction for a commit to happen") ;
+ // Ensure the cache does not flush.
+ nodeTableJournal = null ;
+ // then make sure the journal file is empty.
+ journal.truncate(journalStartOffset) ;
finish() ;
}
@@ -214,6 +222,7 @@ public class NodeTableTrans implements N
@Override
public void close()
{
+ // Closing the journal flushes it; i.e. disk IO.
if ( journal != null )
journal.close() ;
journal = null ;
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java?rev=1158418&r1=1158417&r2=1158418&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java Tue Aug 16 18:43:37 2011
@@ -251,6 +251,12 @@ public class TransactionManager
// committerThread.start() ;
}
+ public void closedown()
+ {
+ processDelayedReplayQueue(null) ;
+ journal.close() ;
+ }
+
private Transaction createTransaction(DatasetGraphTDB dsg, ReadWrite mode, String label)
{
Transaction txn = new Transaction(dsg, mode, transactionId.getAndIncrement(), label, this) ;
@@ -372,8 +378,13 @@ public class TransactionManager
// [TxTDB:TODO]
if ( activeReaders.get() != 0 || activeWriters.get() != 0 )
{
- if ( queue.size() > 0 )
- if ( log() ) log(format("Pending transactions: R=%s / W=%s", activeReaders, activeWriters), txn) ;
+ if ( queue.size() > 0 && log() )
+ {
+ if ( txn != null )
+ log(format("Pending transactions: R=%s / W=%s", activeReaders, activeWriters), txn) ;
+ else
+ logger().debug(format("Pending transactions: R=%s / W=%s", activeReaders, activeWriters)) ;
+ }
return ;
}
// if ( queue.size() > 1 )
@@ -492,7 +503,7 @@ public class TransactionManager
// ---- Recording
- public Journal getJournal()
+ Journal getJournal()
{
return journal ;
}
@@ -506,12 +517,17 @@ public class TransactionManager
{
if ( ! log() )
return ;
+ logger().debug(txn.getLabel()+": "+msg) ;
+ }
+
+ private Logger logger()
+ {
if ( syslog.isDebugEnabled() )
- syslog.debug(txn.getLabel()+": "+msg) ;
+ return syslog ;
else
- log.debug(txn.getLabel()+": "+msg) ;
+ return log ;
}
-
+
synchronized
public SysTxnState state()
{
Modified: incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java?rev=1158418&r1=1158417&r2=1158418&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java Tue Aug 16 18:43:37 2011
@@ -21,7 +21,6 @@ package com.hp.hpl.jena.tdb.transaction;
import org.junit.AfterClass ;
import org.junit.BeforeClass ;
-import org.junit.Ignore ;
import org.junit.Test ;
import org.openjena.atlas.junit.BaseTest ;
import org.openjena.atlas.logging.Log ;
@@ -243,10 +242,38 @@ public abstract class AbstractTestTransS
assertTrue(dsg.contains(q2)) ;
}
-
- @Test @Ignore // Maybe a false test?
+ @Test
public void trans_readBlock_04()
{
+ // READ(block)-WRITE(abort)-WRITE(commit)-READ(close)-check
+ StoreConnection sConn = getStoreConnection() ;
+ DatasetGraphTxn dsgR1 = sConn.begin(ReadWrite.READ) ;
+
+ DatasetGraphTxn dsgW2 = sConn.begin(ReadWrite.WRITE) ;
+ dsgW2.add(q2) ;
+ dsgW2.abort() ; // ABORT
+ dsgW2.close() ;
+ assertFalse(dsgR1.contains(q2)) ;
+
+ DatasetGraphTxn dsgW3 = sConn.begin(ReadWrite.WRITE) ;
+ dsgW3.add(q3) ;
+ // Can see W1
+ assertFalse(dsgW3.contains(q2)) ;
+ dsgW3.commit() ;
+ dsgW3.close() ;
+ assertFalse(dsgR1.contains(q3)) ;
+
+ dsgR1.close() ;
+
+ DatasetGraph dsg = sConn.getBaseDataset() ;
+ assertFalse(dsg.contains(q2)) ;
+ assertTrue(dsg.contains(q3)) ;
+ }
+
+
+ @Test
+ public void trans_readBlock_05()
+ {
// READ(block)-WRITE(commit)-WRITE(abort)-WRITE(commit)-READ(close)-check
StoreConnection sConn = getStoreConnection() ;
DatasetGraphTxn dsgR1 = sConn.begin(ReadWrite.READ) ;
@@ -280,7 +307,7 @@ public abstract class AbstractTestTransS
assertTrue(dsg.contains(q3)) ;
}
- @Test public void trans_readBlock_05()
+ @Test public void trans_readBlock_06()
{
// WRITE(start)-READ(start)-WRITE(commit)-READ sees old DSG.
// READ before WRITE remains seeing old view - READ after WRITE starts
@@ -303,7 +330,7 @@ public abstract class AbstractTestTransS
assertTrue(dsg.contains(q)) ;
}
- @Test public void trans_readBlock_06()
+ @Test public void trans_readBlock_07()
{
// WRITE(start)-READ(start)-add-WRITE(commit)-READ sees old DSG.
// READ before WRITE remains seeing old view - READ after WRITE starts
@@ -327,7 +354,7 @@ public abstract class AbstractTestTransS
assertTrue(dsg.contains(q)) ;
}
- @Test public void trans_readBlock_07()
+ @Test public void trans_readBlock_08()
{
// WRITE(start)-add-READ(start)-WRITE(commit)-READ sees old DSG.
StoreConnection sConn = getStoreConnection() ;
@@ -351,8 +378,7 @@ public abstract class AbstractTestTransS
assertTrue(dsg.contains(q)) ;
}
-
- @Test public void trans_readBlock_08()
+ @Test public void trans_readBlock_09()
{
// WRITE(commit)-READ(start)-WRITE(commit)
StoreConnection sConn = getStoreConnection() ;
@@ -382,7 +408,7 @@ public abstract class AbstractTestTransS
assertTrue(dsg.contains(q2)) ;
}
- @Test public void trans_readBlock_09()
+ @Test public void trans_readBlock_10()
{
// READ(start)-WRITE(start)-WRITE(finish)-WRITE(start)-READ(finish)-WRITE(finish)-check
StoreConnection sConn = getStoreConnection() ;
Modified: incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java?rev=1158418&r1=1158417&r2=1158418&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java Tue Aug 16 18:43:37 2011
@@ -52,10 +52,11 @@ public class TestTransSystem
static { org.openjena.atlas.logging.Log.setLog4j() ; }
private static Logger log = LoggerFactory.getLogger(TestTransSystem.class) ;
- static final Location LOC = new Location(ConfigTest.getTestingDirDB()) ;
- //static final Location LOC = Location.mem() ;
+ static boolean MEM = false ;
+
+ static final Location LOC = MEM ? Location.mem() : new Location(ConfigTest.getTestingDirDB()) ;
- static final int Iterations = 100 ;
+ static final int Iterations = MEM ? 1000 : 100 ;
// Output style.
static boolean inlineProgress = true ; // (! log.isDebugEnabled()) && Iterations > 20 ;
static boolean logging = ! inlineProgress ; // (! log.isDebugEnabled()) && Iterations > 20 ;
@@ -91,7 +92,9 @@ public class TestTransSystem
public static void main(String...args)
{
if ( logging )
- log.info("START");
+ log.info("START ("+ (MEM?"memory":"disk") + ", {} iterations)", Iterations) ;
+ else
+ printf("START (%s, %d iterations)\n", (MEM?"memory":"disk"), Iterations) ;
int N = (Iterations < 10) ? 1 : Iterations / 10 ;
N = Math.min(N, 100) ;
@@ -122,10 +125,13 @@ public class TestTransSystem
}
if (logging)
log.info("FINISH ({})", i) ;
+ else
+ printf("FINISH") ;
}
private static void clean()
{
+ StoreConnection.release(LOC) ;
if ( ! LOC.isMem() )
FileOps.clearDirectory(LOC.getDirectoryPath()) ;
}
@@ -271,8 +277,6 @@ public class TestTransSystem
private StoreConnection sConn ;
protected synchronized StoreConnection getStoreConnection()
{
- if ( LOC.isMem() )
- StoreConnection.reset() ;
StoreConnection sConn = StoreConnection.make(LOC) ;
//sConn.getTransMgr().recording(true) ;
return sConn ;
@@ -381,8 +385,7 @@ public class TestTransSystem
private static void printf(String string, Object...args)
{
- if ( inlineProgress )
- System.out.printf(string, args) ;
+ System.out.printf(string, args) ;
}
private ExecutorService execService = Executors.newCachedThreadPool() ;