You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2011/07/27 23:03:05 UTC
svn commit: r1151627 - in /incubator/jena/Experimental/TxTDB/trunk: ./
resources2/ src/main/java/com/hp/hpl/jena/tdb/transaction/
src/test/java/com/hp/hpl/jena/tdb/transaction/
Author: andy
Date: Wed Jul 27 21:03:03 2011
New Revision: 1151627
URL: http://svn.apache.org/viewvc?rev=1151627&view=rev
Log: (empty)
Modified:
incubator/jena/Experimental/TxTDB/trunk/log4j.properties
incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java
incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java
Modified: incubator/jena/Experimental/TxTDB/trunk/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/log4j.properties?rev=1151627&r1=1151626&r2=1151627&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/log4j.properties (original)
+++ incubator/jena/Experimental/TxTDB/trunk/log4j.properties Wed Jul 27 21:03:03 2011
@@ -1,7 +1,7 @@
log4j.rootLogger=INFO, stdlog
log4j.appender.stdlog=org.apache.log4j.ConsoleAppender
-## log4j.appender.stdlog.target=System.err
+log4j.appender.stdlog.target=System.out
log4j.appender.stdlog.layout=org.apache.log4j.PatternLayout
log4j.appender.stdlog.layout.ConversionPattern=%d{HH:mm:ss} %-5p %-25c{1} :: %m%n
@@ -17,8 +17,11 @@ log4j.logger.com.hp.hpl.jena=WARN
log4j.logger.org.openjena.riot=INFO
# TDB
+# TDB syslog.
+log4j.logger.TDB=INFO
+
log4j.logger.com.hp.hpl.jena.tdb=INFO
-#log4j.logger.com.hp.hpl.jena.tdb.transaction=ALL
+log4j.logger.com.hp.hpl.jena.tdb.transaction=ALL
# Joseki server
log4j.logger.org.joseki=INFO
Modified: incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties?rev=1151627&r1=1151626&r2=1151627&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties (original)
+++ incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties Wed Jul 27 21:03:03 2011
@@ -16,5 +16,9 @@ log4j.logger.com.hp.hpl.jena.tdb.loader=
log4j.logger.com.hp.hpl.jena=WARN
log4j.logger.org.openjena.riot=INFO
+# TDB
+log4j.logger.com.hp.hpl.jena.tdb=INFO
+log4j.logger.com.hp.hpl.jena.tdb.transaction=ALL
+
# Joseki server
log4j.logger.org.joseki=INFO
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java?rev=1151627&r1=1151626&r2=1151627&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java Wed Jul 27 21:03:03 2011
@@ -60,12 +60,12 @@ public class JournalControl
// Do we need to recover?
Journal journal = findJournal(dsg) ;
- if ( journal != null )
- {
- for ( FileRef fileRef : dsg.getConfig().nodeTables.keySet() )
- recoverNodeDat(dsg, fileRef) ;
- recoverSystemJournal(journal, dsg) ;
- }
+ if ( journal == null )
+ return ;
+
+ for ( FileRef fileRef : dsg.getConfig().nodeTables.keySet() )
+ recoverNodeDat(dsg, fileRef) ;
+ recoverSystemJournal(journal, dsg) ;
// Recovery complete. Tidy up. Node journal files have already been handled.
if ( journal.getFilename() != null )
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java?rev=1151627&r1=1151626&r2=1151627&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java Wed Jul 27 21:03:03 2011
@@ -43,13 +43,30 @@ public class TransactionManager
static long transactionId = 1 ;
- private int readers = 0 ;
- private int writers = 0 ; // 0 or 1
+ private int activeReaders = 0 ;
+ private int activeWriters = 0 ; // 0 or 1
// Misc stats
private int finishedReads = 0 ;
private int committedWrite = 0 ;
private int abortedWrite = 0 ;
+
+ public static class State
+ {
+ final public int activeReaders ;
+ final public int activeWriters ;
+ final public int finishedReads ;
+ final public int committedWrite ;
+ final public int abortedWrite ;
+ State(TransactionManager tm)
+ {
+ activeReaders = tm.activeReaders ;
+ activeWriters = tm.activeWriters ;
+ finishedReads = tm.finishedReads ;
+ committedWrite = tm.committedWrite ;
+ abortedWrite = tm.abortedWrite ;
+ }
+ }
private BlockingQueue<Transaction> queue = new LinkedBlockingDeque<Transaction>() ;
@@ -97,9 +114,9 @@ public class TransactionManager
// }
switch (mode)
{
- case READ : readers++ ; break ;
+ case READ : activeReaders++ ; break ;
case WRITE :
- int x = writers++ ;
+ int x = activeWriters++ ;
if ( x > 0 )
throw new TDBTransactionException("Existing active write transaction") ;
break ;
@@ -140,21 +157,25 @@ public class TransactionManager
endTransaction(transaction) ;
- if ( transaction.getMode() == ReadWrite.WRITE )
+ switch ( transaction.getMode() )
{
- if ( readers == 0 )
- // Can commit imemdiately.
- commitTransaction(transaction) ;
- else
- {
- // Can't make permentent at the moment.
- commitedAwaitingFlush.add(transaction) ;
- //log.debug("Commit pending: "+transaction.getLabel());
-
- //if ( log.isDebugEnabled() )
- // log.debug("Commit blocked at the moment") ;
- queue.add(transaction) ;
- }
+ case READ:
+ endOfRead(transaction) ;
+ break ;
+ case WRITE:
+ if ( activeReaders == 0 )
+ // Can commit imemdiately.
+ commitTransaction(transaction) ;
+ else
+ {
+ // Can't make permanent at the moment.
+ commitedAwaitingFlush.add(transaction) ;
+ log.debug("Commit flush: "+transaction.getLabel());
+ //if ( log.isDebugEnabled() )
+ // log.debug("Commit blocked at the moment") ;
+ queue.add(transaction) ;
+ }
+ committedWrite ++ ;
}
}
@@ -179,9 +200,51 @@ public class TransactionManager
// Transaction has done the abort on all the transactional elements.
if ( ! activeTransactions.contains(transaction) )
SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ;
+
endTransaction(transaction) ;
+
+ switch ( transaction.getMode() )
+ {
+ case READ:
+ endOfRead(transaction) ;
+ break ;
+ case WRITE:
+ // Journal cleaned in Transaction.abort.
+ abortedWrite ++ ;
+ }
+ }
+
+ /** READ specific final actions. */
+ private void endOfRead(Transaction transaction)
+ {
+ processDelayedReplyQueue(transaction) ;
+ finishedReads ++ ;
}
+ private void processDelayedReplyQueue(Transaction txn)
+ {
+ if ( activeReaders != 0 || activeWriters != 0 )
+ {
+ if ( queue.size() > 0 )
+ if ( log() ) log(format("Pending transactions: R=%d / W=%d", activeReaders, activeWriters), txn) ;
+ return ;
+ }
+ while ( queue.size() > 0 )
+ {
+ try {
+ Transaction txn2 = queue.take() ;
+
+ if ( txn2.getMode() == READ )
+ continue ;
+ log("Flush delayed commit", txn2) ;
+ // This takes a Write lock on the DSG - this is where it blocks.
+ JournalControl.replay(txn2) ;
+ commitedAwaitingFlush.remove(txn2) ;
+ } catch (InterruptedException ex)
+ { Log.fatal(this, "Interruped!", ex) ; }
+ }
+ }
+
synchronized
public void notifyClose(Transaction txn)
{
@@ -192,40 +255,16 @@ public class TransactionManager
String x = txn.getBaseDataset().getLocation().getDirectoryPath() ;
syslog.warn("close: Transaction not commited or aborted: Transaction: "+txn.getTxnId()+" @ "+x) ;
txn.abort() ;
- }
-
- // Process any pending commits held up due to a reader.
- if ( readers == 0 && writers == 0 )
- {
- // Given this is sync'ed to this TransactionManager,
- // the query never blocks, nor does it need to be concurrent-safe.
- // later ...
- while ( queue.size() > 0 )
- {
- try {
- Transaction txn2 = queue.take() ;
- if ( txn2.getMode() == READ )
- continue ;
- log.info("Delayed commit", txn2) ;
- // This takes a Write lock on the DSG - this is where it blocks.
- JournalControl.replay(txn2) ;
- commitedAwaitingFlush.remove(txn) ;
- } catch (InterruptedException ex)
- { Log.fatal(this, "Interruped!", ex) ; }
- }
- }
- else
- {
- if ( log() ) log(format("Pending transactions: R=%d / W=%d", readers, writers), txn) ;
+ return ;
}
}
-
+
private void endTransaction(Transaction transaction)
{
if ( transaction.getMode() == READ )
- readers-- ;
+ activeReaders-- ;
else
- writers-- ;
+ activeWriters-- ;
activeTransactions.remove(transaction) ;
}
@@ -249,6 +288,10 @@ public class TransactionManager
log.debug(txn.getLabel()+": "+msg) ;
}
+ synchronized
+ public State state()
+ { return new State(this) ; }
+
// LATER.
class Committer implements Runnable
{
Modified: incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java?rev=1151627&r1=1151626&r2=1151627&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java Wed Jul 27 21:03:03 2011
@@ -19,9 +19,12 @@
package com.hp.hpl.jena.tdb.transaction;
+import java.util.Iterator ;
+
import org.junit.Test ;
import org.openjena.atlas.junit.BaseTest ;
+import com.hp.hpl.jena.graph.Node ;
import com.hp.hpl.jena.sparql.core.Quad ;
import com.hp.hpl.jena.sparql.sse.SSE ;
import com.hp.hpl.jena.tdb.DatasetGraphTxn ;
@@ -98,10 +101,34 @@ public abstract class AbstractTestTransS
@Test public void trans_05()
{
- // READ(block)-WRITE-commit-WRITE-abort-WRITE-commit
+ // READ before WRITE remains seeing old view - READ before WRITE starts
+ StoreConnection sConn = getStoreConnection() ;
+ DatasetGraphTxn dsgR1 = sConn.begin(ReadWrite.READ) ;
+ DatasetGraphTxn dsgW = sConn.begin(ReadWrite.WRITE) ;
+
+ dsgW.add(q) ;
+ dsgW.commit() ;
+ dsgW.close() ;
+
+ assertFalse(dsgR1.contains(q)) ;
+ dsgR1.close() ;
+
+ DatasetGraphTxn dsgR2 = sConn.begin(ReadWrite.READ) ;
+ assertTrue(dsgR2.contains(q)) ;
+ dsgR2.close() ;
+ }
+
+ @Test public void trans_06()
+ {
+ // READ(block)-WRITE-commit-WRITE-abort-WRITE-commit-READ(close)-check
StoreConnection sConn = getStoreConnection() ;
DatasetGraphTxn dsgR1 = sConn.begin(ReadWrite.READ) ;
+ // IF
+ // dsgR1.close() ;
+ // THEN it works.
+ // ==> deplay replay
+
DatasetGraphTxn dsgW1 = sConn.begin(ReadWrite.WRITE) ;
dsgW1.add(q1) ;
dsgW1.commit() ;
@@ -123,27 +150,13 @@ public abstract class AbstractTestTransS
dsgR1.close() ;
DatasetGraphTxn dsgR2 = sConn.begin(ReadWrite.READ) ;
+
assertTrue(dsgR2.contains(q1)) ;
assertFalse(dsgR2.contains(q2)) ;
assertTrue(dsgR2.contains(q3)) ;
dsgR2.close() ;
}
- @Test public void trans_06()
- {
- // READ before WRITE remains seeing old view - READ before WRITE starts
- StoreConnection sConn = getStoreConnection() ;
- DatasetGraphTxn dsgR = sConn.begin(ReadWrite.READ) ;
- DatasetGraphTxn dsgW = sConn.begin(ReadWrite.WRITE) ;
-
- dsgW.add(q) ;
- dsgW.commit() ;
- dsgW.close() ;
-
- assertFalse(dsgR.contains(q)) ;
- dsgR.close() ;
- }
-
@Test public void trans_07()
{
// READ before WRITE remains seeing old view - READ after WRITE starts
Modified: incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java?rev=1151627&r1=1151626&r2=1151627&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java Wed Jul 27 21:03:03 2011
@@ -19,6 +19,7 @@
package com.hp.hpl.jena.tdb.transaction;
import static com.hp.hpl.jena.tdb.transaction.TransTestLib.count ;
+import static java.lang.String.format ;
import java.util.concurrent.Callable ;
import java.util.concurrent.ExecutorService ;
@@ -30,6 +31,10 @@ import org.junit.AfterClass ;
import org.junit.BeforeClass ;
import org.openjena.atlas.lib.FileOps ;
import org.openjena.atlas.lib.Lib ;
+import org.openjena.atlas.lib.RandomLib ;
+import org.openjena.atlas.logging.Log ;
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
import com.hp.hpl.jena.datatypes.xsd.XSDDatatype ;
import com.hp.hpl.jena.graph.Node ;
@@ -43,23 +48,47 @@ import com.hp.hpl.jena.tdb.base.file.Loc
/** System testing of the transactions. */
public class TestTransSystem
{
+ static { Log.setLog4j() ; }
+ private static Logger log = LoggerFactory.getLogger(TestTransSystem.class) ;
+ static final boolean progress = ! log.isInfoEnabled() ;
+
+
+ static final int Iterations = 1 ;
+ static final int readerSeqRepeats = 10 ;
+ static final int readerMaxPause = 50 ;
+ static final int writerAbortSeqRepeats = 1 ;
+ static final int writerCommitSeqRepeats = 5 ;
+ static final int writerMaxPause = 10 ;
+
public static void main(String...args)
{
- final int N = 100 ;
+
+ final int N = (Iterations < 10) ? 1 : Iterations / 10 ;
int i ;
- for ( i = 0 ; i < 1000 ; i++ )
+ for ( i = 0 ; i < Iterations ; i++ )
{
if ( i%N == 0 )
- System.out.printf("%03d: ",i) ;
- System.out.print(".") ;
+ printf("%03d: ",i) ;
+ printf(".") ;
if ( i%N == (N-1) )
- System.out.println() ;
+ println() ;
new TestTransSystem().manyReaderAndOneWriter() ;
}
if ( i%N != 0 )
System.out.println() ;
- System.out.println() ;
- System.out.printf("DONE (%03d)\n",i) ;
+ println() ;
+ printf("DONE (%03d)\n",i) ;
+ }
+
+ private static void println()
+ {
+ printf("\n") ;
+ }
+
+ private static void printf(String string, Object...args)
+ {
+ if ( progress )
+ System.out.printf(string, args) ;
}
private ExecutorService execService = Executors.newCachedThreadPool() ;
@@ -124,15 +153,15 @@ public class TestTransSystem
final int numOfTasks = 10 ;
final StoreConnection sConn = getStoreConnection() ;
- Callable<?> procR = new Reader(sConn, 10, 50) ; // Number of repeats, max pause
- Callable<?> procW_a = new Writer(sConn, 1, 10, false) // Number of repeats, max pause, commit.
+ Callable<?> procR = new Reader(sConn, readerSeqRepeats, readerMaxPause) ; // Number of repeats, max pause
+ Callable<?> procW_a = new Writer(sConn, writerAbortSeqRepeats, writerMaxPause, false) // Number of repeats, max pause, commit.
{
@Override
protected int change(DatasetGraphTxn dsg, int id, int i)
{ return changeProc(dsg, id, i) ; }
} ;
- Callable<?> procW_c = new Writer(sConn, 5, 10, true) // Number of repeats, max pause, commit.
+ Callable<?> procW_c = new Writer(sConn, writerCommitSeqRepeats, writerMaxPause, true) // Number of repeats, max pause, commit.
{
@Override
protected int change(DatasetGraphTxn dsg, int id, int i)
@@ -158,16 +187,18 @@ public class TestTransSystem
static int changeProc(DatasetGraphTxn dsg, int id, int i)
{
int count = 0 ;
- int N = 5 ;
+ int maxN = 500 ;
+ int N = RandomLib.qrandom.nextInt(maxN) ;
for ( int j = 0 ; j < N; j++ )
{
- Quad q = genQuad(id+j) ;
+ Quad q = genQuad(id*maxN+j) ;
if ( ! dsg.contains(q) )
{
dsg.add(q) ;
count++ ;
}
}
+ log.debug("Change = "+dsg.getDefaultGraph().size()) ;
return count ;
}
@@ -191,11 +222,14 @@ public class TestTransSystem
for ( int i = 0 ; i < repeats; i++ )
{
DatasetGraphTxn dsg = sConn.begin(ReadWrite.READ) ;
+ log.debug("reader start "+id+"/"+i) ;
+
int x1 = count("SELECT * { ?s ?p ?o }", dsg) ;
pause(maxpause) ;
int x2 = count("SELECT * { ?s ?p ?o }", dsg) ;
if ( x1 != x2 )
- System.err.printf("Change seen: id=%d: i=%d\n", id, i) ;
+ log.warn(format("Change seen: id=%d: i=%d\n", id, i)) ;
+ log.debug("reader finish "+id+"/"+i) ;
dsg.close() ;
}
return null ;
@@ -224,6 +258,8 @@ public class TestTransSystem
for ( int i = 0 ; i < repeats ; i++ )
{
DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ;
+System.err.println("writer "+id+"/"+i) ;
+
int x1 = count("SELECT * { ?s ?p ?o }", dsg) ;
int z = change(dsg, id, i) ;
pause(maxpause) ;