You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2011/07/27 23:03:05 UTC

svn commit: r1151627 - in /incubator/jena/Experimental/TxTDB/trunk: ./ resources2/ src/main/java/com/hp/hpl/jena/tdb/transaction/ src/test/java/com/hp/hpl/jena/tdb/transaction/

Author: andy
Date: Wed Jul 27 21:03:03 2011
New Revision: 1151627

URL: http://svn.apache.org/viewvc?rev=1151627&view=rev
Log: (empty)

Modified:
    incubator/jena/Experimental/TxTDB/trunk/log4j.properties
    incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
    incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java
    incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java

Modified: incubator/jena/Experimental/TxTDB/trunk/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/log4j.properties?rev=1151627&r1=1151626&r2=1151627&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/log4j.properties (original)
+++ incubator/jena/Experimental/TxTDB/trunk/log4j.properties Wed Jul 27 21:03:03 2011
@@ -1,7 +1,7 @@
 log4j.rootLogger=INFO, stdlog
 
 log4j.appender.stdlog=org.apache.log4j.ConsoleAppender
-## log4j.appender.stdlog.target=System.err
+log4j.appender.stdlog.target=System.out
 log4j.appender.stdlog.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdlog.layout.ConversionPattern=%d{HH:mm:ss} %-5p %-25c{1} :: %m%n
 
@@ -17,8 +17,11 @@ log4j.logger.com.hp.hpl.jena=WARN
 log4j.logger.org.openjena.riot=INFO
 
 # TDB 
+# TDB syslog.
+log4j.logger.TDB=INFO
+
 log4j.logger.com.hp.hpl.jena.tdb=INFO
-#log4j.logger.com.hp.hpl.jena.tdb.transaction=ALL
+log4j.logger.com.hp.hpl.jena.tdb.transaction=ALL
 
 # Joseki server
 log4j.logger.org.joseki=INFO

Modified: incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties?rev=1151627&r1=1151626&r2=1151627&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties (original)
+++ incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties Wed Jul 27 21:03:03 2011
@@ -16,5 +16,9 @@ log4j.logger.com.hp.hpl.jena.tdb.loader=
 log4j.logger.com.hp.hpl.jena=WARN
 log4j.logger.org.openjena.riot=INFO
 
+# TDB 
+log4j.logger.com.hp.hpl.jena.tdb=INFO
+log4j.logger.com.hp.hpl.jena.tdb.transaction=ALL
+
 # Joseki server
 log4j.logger.org.joseki=INFO

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java?rev=1151627&r1=1151626&r2=1151627&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java Wed Jul 27 21:03:03 2011
@@ -60,12 +60,12 @@ public class JournalControl
         
         // Do we need to recover?
         Journal journal = findJournal(dsg) ;
-        if ( journal != null )
-        {
-            for ( FileRef fileRef : dsg.getConfig().nodeTables.keySet() )
-                recoverNodeDat(dsg, fileRef) ;
-            recoverSystemJournal(journal, dsg) ;
-        }
+        if ( journal == null )
+            return ;
+        
+        for ( FileRef fileRef : dsg.getConfig().nodeTables.keySet() )
+            recoverNodeDat(dsg, fileRef) ;
+        recoverSystemJournal(journal, dsg) ;
         
         // Recovery complete.  Tidy up.  Node journal files have already been handled.
         if ( journal.getFilename() != null )

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java?rev=1151627&r1=1151626&r2=1151627&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java Wed Jul 27 21:03:03 2011
@@ -43,13 +43,30 @@ public class TransactionManager
     
     static long transactionId = 1 ;
     
-    private int readers = 0 ; 
-    private int writers = 0 ;  // 0 or 1
+    private int activeReaders = 0 ; 
+    private int activeWriters = 0 ;  // 0 or 1
     
     // Misc stats
     private int finishedReads = 0 ;
     private int committedWrite = 0 ;
     private int abortedWrite = 0 ;
+    
+    public static class State
+    {
+        final public int activeReaders ; 
+        final public int activeWriters ;
+        final public int finishedReads ;
+        final public int committedWrite ;
+        final public int abortedWrite ;
+        State(TransactionManager tm)
+        {
+            activeReaders = tm.activeReaders ;
+            activeWriters = tm.activeWriters ;
+            finishedReads = tm.finishedReads ;
+            committedWrite = tm.committedWrite ;
+            abortedWrite = tm.abortedWrite ;
+        }
+    }
 
     private BlockingQueue<Transaction> queue = new LinkedBlockingDeque<Transaction>() ;
 
@@ -97,9 +114,9 @@ public class TransactionManager
 //        }
         switch (mode)
         {
-            case READ : readers++ ; break ;
+            case READ : activeReaders++ ; break ;
             case WRITE :
-                int x = writers++ ;
+                int x = activeWriters++ ;
                 if ( x > 0 )
                     throw new TDBTransactionException("Existing active write transaction") ;
                 break ;
@@ -140,21 +157,25 @@ public class TransactionManager
         
         endTransaction(transaction) ;
         
-        if ( transaction.getMode() == ReadWrite.WRITE )
+        switch ( transaction.getMode() )
         {
-            if ( readers == 0 )
-                // Can commit imemdiately.
-                commitTransaction(transaction) ;
-            else
-            {
-                // Can't make permentent at the moment.
-                commitedAwaitingFlush.add(transaction) ;
-                //log.debug("Commit pending: "+transaction.getLabel()); 
-
-                //if ( log.isDebugEnabled() )
-                //    log.debug("Commit blocked at the moment") ;
-                queue.add(transaction) ;
-            }
+            case READ: 
+                endOfRead(transaction) ;
+                break ;
+            case WRITE:
+                if ( activeReaders == 0 )
+                    // Can commit imemdiately.
+                    commitTransaction(transaction) ;
+                else
+                {
+                    // Can't make permanent at the moment.
+                    commitedAwaitingFlush.add(transaction) ;
+                    log.debug("Commit flush: "+transaction.getLabel()); 
+                    //if ( log.isDebugEnabled() )
+                    //    log.debug("Commit blocked at the moment") ;
+                    queue.add(transaction) ;
+                }
+                committedWrite ++ ;
         }
     }
 
@@ -179,9 +200,51 @@ public class TransactionManager
         // Transaction has done the abort on all the transactional elements.
         if ( ! activeTransactions.contains(transaction) )
             SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ;
+        
         endTransaction(transaction) ;
+        
+        switch ( transaction.getMode() )
+        {
+            case READ:
+                endOfRead(transaction) ;
+                break ;
+            case WRITE:
+                // Journal cleaned in Transaction.abort.
+                abortedWrite ++ ;
+        }
+    }
+    
+    /** READ specific final actions. */
+    private void endOfRead(Transaction transaction)
+    {
+        processDelayedReplyQueue(transaction) ;
+        finishedReads ++ ;
     }
     
+    private void processDelayedReplyQueue(Transaction txn)
+    {
+        if ( activeReaders != 0 || activeWriters != 0 )
+        {
+            if ( queue.size() > 0 )
+                if ( log() ) log(format("Pending transactions: R=%d / W=%d", activeReaders, activeWriters), txn) ;
+            return ;
+        }
+        while ( queue.size() > 0 )
+        {
+            try {
+                Transaction txn2 = queue.take() ;
+               
+                if ( txn2.getMode() == READ )
+                    continue ;
+                log("Flush delayed commit", txn2) ;
+                // This takes a Write lock on the  DSG - this is where it blocks.
+                JournalControl.replay(txn2) ;
+                commitedAwaitingFlush.remove(txn2) ;
+            } catch (InterruptedException ex)
+            { Log.fatal(this, "Interruped!", ex) ; }
+        }
+    }
+
     synchronized
     public void notifyClose(Transaction txn)
     {
@@ -192,40 +255,16 @@ public class TransactionManager
             String x = txn.getBaseDataset().getLocation().getDirectoryPath() ;
             syslog.warn("close: Transaction not commited or aborted: Transaction: "+txn.getTxnId()+" @ "+x) ;
             txn.abort() ;
-        }
-
-        // Process any pending commits held up due to a reader. 
-        if ( readers == 0 && writers == 0 ) 
-        {
-            // Given this is sync'ed to this TransactionManager, 
-            // the query never blocks, nor does it need to be concurrent-safe.
-            // later ...
-            while ( queue.size() > 0 )
-            {
-                try {
-                    Transaction txn2 = queue.take() ;
-                    if ( txn2.getMode() == READ )
-                        continue ;
-                    log.info("Delayed commit", txn2) ;
-                    // This takes a Write lock on the  DSG - this is where it blocks.
-                    JournalControl.replay(txn2) ;
-                    commitedAwaitingFlush.remove(txn) ;
-                } catch (InterruptedException ex)
-                { Log.fatal(this, "Interruped!", ex) ; }
-            }
-        }
-        else
-        {
-            if ( log() ) log(format("Pending transactions: R=%d / W=%d", readers, writers), txn) ;
+            return ;
         }
     }
-    
+        
     private void endTransaction(Transaction transaction)
     {
         if ( transaction.getMode() == READ )
-            readers-- ;
+            activeReaders-- ;
         else
-            writers-- ;
+            activeWriters-- ;
         activeTransactions.remove(transaction) ;
     }
     
@@ -249,6 +288,10 @@ public class TransactionManager
             log.debug(txn.getLabel()+": "+msg) ;
     }
 
+    synchronized
+    public State state()
+    { return new State(this) ; }
+    
     // LATER.
     class Committer implements Runnable
     {

Modified: incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java?rev=1151627&r1=1151626&r2=1151627&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java Wed Jul 27 21:03:03 2011
@@ -19,9 +19,12 @@
 package com.hp.hpl.jena.tdb.transaction;
 
 
+import java.util.Iterator ;
+
 import org.junit.Test ;
 import org.openjena.atlas.junit.BaseTest ;
 
+import com.hp.hpl.jena.graph.Node ;
 import com.hp.hpl.jena.sparql.core.Quad ;
 import com.hp.hpl.jena.sparql.sse.SSE ;
 import com.hp.hpl.jena.tdb.DatasetGraphTxn ;
@@ -98,10 +101,34 @@ public abstract class AbstractTestTransS
     
     @Test public void trans_05()
     {
-        // READ(block)-WRITE-commit-WRITE-abort-WRITE-commit
+        // READ before WRITE remains seeing old view - READ before WRITE starts 
+        StoreConnection sConn = getStoreConnection() ;
+        DatasetGraphTxn dsgR1 = sConn.begin(ReadWrite.READ) ;
+        DatasetGraphTxn dsgW = sConn.begin(ReadWrite.WRITE) ;
+        
+        dsgW.add(q) ;
+        dsgW.commit() ;
+        dsgW.close() ;
+        
+        assertFalse(dsgR1.contains(q)) ;
+        dsgR1.close() ;
+
+        DatasetGraphTxn dsgR2 = sConn.begin(ReadWrite.READ) ;
+        assertTrue(dsgR2.contains(q)) ;
+        dsgR2.close() ;
+    }
+
+    @Test public void trans_06()
+    {
+        // READ(block)-WRITE-commit-WRITE-abort-WRITE-commit-READ(close)-check
         StoreConnection sConn = getStoreConnection() ;
         DatasetGraphTxn dsgR1 = sConn.begin(ReadWrite.READ) ;
         
+        // IF 
+        // dsgR1.close() ;
+        // THEN it works.
+        // ==> deplay replay
+        
         DatasetGraphTxn dsgW1 = sConn.begin(ReadWrite.WRITE) ;
         dsgW1.add(q1) ;
         dsgW1.commit() ;
@@ -123,27 +150,13 @@ public abstract class AbstractTestTransS
         dsgR1.close() ;
         
         DatasetGraphTxn dsgR2 = sConn.begin(ReadWrite.READ) ;
+        
         assertTrue(dsgR2.contains(q1)) ;
         assertFalse(dsgR2.contains(q2)) ;
         assertTrue(dsgR2.contains(q3)) ;
         dsgR2.close() ;
     }
 
-    @Test public void trans_06()
-    {
-        // READ before WRITE remains seeing old view - READ before WRITE starts 
-        StoreConnection sConn = getStoreConnection() ;
-        DatasetGraphTxn dsgR = sConn.begin(ReadWrite.READ) ;
-        DatasetGraphTxn dsgW = sConn.begin(ReadWrite.WRITE) ;
-        
-        dsgW.add(q) ;
-        dsgW.commit() ;
-        dsgW.close() ;
-        
-        assertFalse(dsgR.contains(q)) ;
-        dsgR.close() ;
-    }
-
     @Test public void trans_07()
     {
         // READ before WRITE remains seeing old view - READ after WRITE starts 

Modified: incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java?rev=1151627&r1=1151626&r2=1151627&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java Wed Jul 27 21:03:03 2011
@@ -19,6 +19,7 @@
 package com.hp.hpl.jena.tdb.transaction;
 
 import static com.hp.hpl.jena.tdb.transaction.TransTestLib.count ;
+import static java.lang.String.format ;
 
 import java.util.concurrent.Callable ;
 import java.util.concurrent.ExecutorService ;
@@ -30,6 +31,10 @@ import org.junit.AfterClass ;
 import org.junit.BeforeClass ;
 import org.openjena.atlas.lib.FileOps ;
 import org.openjena.atlas.lib.Lib ;
+import org.openjena.atlas.lib.RandomLib ;
+import org.openjena.atlas.logging.Log ;
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
 
 import com.hp.hpl.jena.datatypes.xsd.XSDDatatype ;
 import com.hp.hpl.jena.graph.Node ;
@@ -43,23 +48,47 @@ import com.hp.hpl.jena.tdb.base.file.Loc
 /** System testing of the transactions. */
 public class TestTransSystem
 {
+    static { Log.setLog4j() ; }
+    private static Logger log = LoggerFactory.getLogger(TestTransSystem.class) ;
+    static final boolean progress = ! log.isInfoEnabled() ;
+    
+    
+    static final int Iterations = 1 ;
+    static final int readerSeqRepeats = 10 ;    
+    static final int readerMaxPause = 50 ;
+    static final int writerAbortSeqRepeats = 1 ;
+    static final int writerCommitSeqRepeats = 5 ;
+    static final int writerMaxPause = 10 ;
+    
     public static void main(String...args)
     {
-        final int N = 100 ;
+        
+        final int N = (Iterations < 10) ? 1 : Iterations / 10 ;
         int i ;
-        for ( i = 0 ; i < 1000 ; i++ )
+        for ( i = 0 ; i < Iterations ; i++ )
         {
             if ( i%N == 0 )
-                System.out.printf("%03d: ",i) ;
-            System.out.print(".") ;
+                printf("%03d: ",i) ;
+            printf(".") ;
             if ( i%N == (N-1) )
-                System.out.println() ;
+                println() ;
             new TestTransSystem().manyReaderAndOneWriter() ;
         }
         if ( i%N != 0 )
             System.out.println() ;
-        System.out.println() ;
-        System.out.printf("DONE (%03d)\n",i) ;
+        println() ;
+        printf("DONE (%03d)\n",i) ;
+    }
+    
+    private static void println()
+    {
+        printf("\n") ;
+    }
+    
+    private static void printf(String string, Object...args)
+    {
+        if ( progress )
+            System.out.printf(string, args) ;
     }
     
     private ExecutorService execService = Executors.newCachedThreadPool() ;
@@ -124,15 +153,15 @@ public class TestTransSystem
         final int numOfTasks = 10 ;
         final StoreConnection sConn = getStoreConnection() ;
         
-        Callable<?> procR = new Reader(sConn, 10, 50) ;      // Number of repeats, max pause
-        Callable<?> procW_a = new Writer(sConn, 1, 10, false)  // Number of repeats, max pause, commit. 
+        Callable<?> procR = new Reader(sConn, readerSeqRepeats, readerMaxPause) ;      // Number of repeats, max pause
+        Callable<?> procW_a = new Writer(sConn, writerAbortSeqRepeats, writerMaxPause, false)  // Number of repeats, max pause, commit. 
         {
             @Override
             protected int change(DatasetGraphTxn dsg, int id, int i)
             { return changeProc(dsg, id, i) ; }
         } ;
             
-        Callable<?> procW_c = new Writer(sConn, 5, 10, true)  // Number of repeats, max pause, commit. 
+        Callable<?> procW_c = new Writer(sConn, writerCommitSeqRepeats, writerMaxPause, true)  // Number of repeats, max pause, commit. 
         {
             @Override
             protected int change(DatasetGraphTxn dsg, int id, int i)
@@ -158,16 +187,18 @@ public class TestTransSystem
     static int changeProc(DatasetGraphTxn dsg, int id, int i)
     {
         int count = 0 ;
-        int N = 5 ;
+        int maxN = 500 ;
+        int N = RandomLib.qrandom.nextInt(maxN) ;
         for ( int j = 0 ; j < N; j++ )
         {
-            Quad q = genQuad(id+j) ;
+            Quad q = genQuad(id*maxN+j) ;
             if ( ! dsg.contains(q) )
             {
                 dsg.add(q) ;
                 count++ ;
             }
         }
+        log.debug("Change = "+dsg.getDefaultGraph().size()) ;
         return count ;
     }
     
@@ -191,11 +222,14 @@ public class TestTransSystem
             for ( int i = 0 ; i < repeats; i++ )
             {
                 DatasetGraphTxn dsg = sConn.begin(ReadWrite.READ) ;
+                log.debug("reader start "+id+"/"+i) ;                
+                
                 int x1 = count("SELECT * { ?s ?p ?o }", dsg) ;
                 pause(maxpause) ;
                 int x2 = count("SELECT * { ?s ?p ?o }", dsg) ;
                 if ( x1 != x2 )
-                    System.err.printf("Change seen: id=%d: i=%d\n", id, i) ;
+                    log.warn(format("Change seen: id=%d: i=%d\n", id, i)) ;
+                log.debug("reader finish "+id+"/"+i) ;                
                 dsg.close() ;
             }
             return null ;
@@ -224,6 +258,8 @@ public class TestTransSystem
             for ( int i = 0 ; i < repeats ; i++ )
             {
                 DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ;
+System.err.println("writer "+id+"/"+i) ;                
+                
                 int x1 = count("SELECT * { ?s ?p ?o }", dsg) ;
                 int z = change(dsg, id, i) ;
                 pause(maxpause) ;