You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2011/07/26 21:20:31 UTC

svn commit: r1151217 - in /incubator/jena/Experimental/TxTDB/trunk: src-dev/tx/ src/main/java/com/hp/hpl/jena/tdb/ src/main/java/com/hp/hpl/jena/tdb/transaction/ src/test/java/com/hp/hpl/jena/tdb/transaction/

Author: andy
Date: Tue Jul 26 19:20:29 2011
New Revision: 1151217

URL: http://svn.apache.org/viewvc?rev=1151217&view=rev
Log:
Conncurrency fixes.
Multithreaded reader/writer tests.

Added:
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionMBean.java   (with props)
Modified:
    incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
    incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java

Modified: incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java?rev=1151217&r1=1151216&r2=1151217&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java Tue Jul 26 19:20:29 2011
@@ -19,6 +19,7 @@ public class DevTx
     // * Dataset API
     // * UUID per committed version to support etags
     // * Promote => duplicate even when not necessary.  BlockMgr property.
+    // * Monitoring and stats : JMX.
     
     
     // Tidy up:

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java?rev=1151217&r1=1151216&r2=1151217&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java Tue Jul 26 19:20:29 2011
@@ -64,7 +64,7 @@ public class StoreConnection
 
     private static Map<Location, StoreConnection> cache = new HashMap<Location, StoreConnection>() ;
     
-    public static void reset() 
+    public static synchronized void reset() 
     {
         for ( Map.Entry<Location, StoreConnection> e : cache.entrySet() )
             e.getValue().baseDSG.close() ;
@@ -72,7 +72,7 @@ public class StoreConnection
         cache.clear() ;
     }
     
-    public static StoreConnection make(Location location)
+    public static synchronized StoreConnection make(Location location)
     {
         TDBMaker.releaseLocation(location) ;
         StoreConnection sConn = cache.get(location) ;

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java?rev=1151217&r1=1151216&r2=1151217&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java Tue Jul 26 19:20:29 2011
@@ -203,7 +203,8 @@ public class BlockMgrJournal implements 
     @Override
     public void overwrite(Block block)
     {
-        write(block) ;
+        // We are in a chain of BlockMgrs - pass down to the base.
+        blockMgr.overwrite(block) ;
     }
 
     @Override

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java?rev=1151217&r1=1151216&r2=1151217&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java Tue Jul 26 19:20:29 2011
@@ -177,6 +177,7 @@ public class JournalControl
                 
                 BlockMgr blkMgr = mgrs.get(e.getFileRef()) ;
                 Block blk = e.getBlock() ;
+                blk.setModified(true) ;
                 blkMgr.overwrite(blk) ; 
                 
 //                Block blk = blkMgr.getWrite(e.getBlock().getId()) ;

Added: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionMBean.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionMBean.java?rev=1151217&view=auto
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionMBean.java (added)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionMBean.java Tue Jul 26 19:20:29 2011
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.tdb.transaction;
+
+/** Interface to stats for a single Transaction Manager */ 
+public interface TransactionMBean
+{
+    /** Number of transactions executed */
+    long getTransactionCount() ; 
+    
+    /** Number of read transactions executed */
+    long getReadTransactionCount() ; 
+
+    /** Number of write transactions executed */
+    long getWriteTransactionCount() ; 
+
+    /** Number of write transactions that aborted */
+    long getWriteAbortTransactionCount() ; 
+
+    /** Number of write transactions that committed */
+    long getWriteCommitTransactionCount() ; 
+
+    /** Number of write transactions that have committed but are not written to the base database */
+    long getWriteCommitTransactionPendingCount() ; 
+
+    /** Number of write transactions executing */
+    long getCurrentWriteTransactionCount() ; 
+
+    /** Number of read transactions executing */
+    long getCurrentReadTransactionCount() ; 
+}
+

Propchange: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionMBean.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java?rev=1151217&r1=1151216&r2=1151217&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java Tue Jul 26 19:20:29 2011
@@ -44,7 +44,7 @@ public class TransactionManager
     static long transactionId = 1 ;
     
     private int readers = 0 ; 
-    private int writers = 0 ;       // 0 or 1
+    private int writers = 0 ;  // 0 or 1
     
     // Misc stats
     private int finishedReads = 0 ;
@@ -99,9 +99,9 @@ public class TransactionManager
         {
             case READ : readers++ ; break ;
             case WRITE :
-                if ( writers > 0 )
-                    throw new TDBTransactionException("Existing active transaction") ;
-                writers ++ ;
+                int x = writers++ ;
+                if ( x > 0 )
+                    throw new TDBTransactionException("Existing active write transaction") ;
                 break ;
         }
         
@@ -173,7 +173,6 @@ public class TransactionManager
     {
         log("abort", transaction) ;
         // Transaction has done the abort on all the transactional elements.
-        // TODO Suppose the system journal has  
         if ( ! activeTransactions.contains(transaction) )
             SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ;
         endTransaction(transaction) ;

Modified: incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java?rev=1151217&r1=1151216&r2=1151217&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java Tue Jul 26 19:20:29 2011
@@ -20,69 +20,93 @@ package com.hp.hpl.jena.tdb.transaction;
 
 import static com.hp.hpl.jena.tdb.transaction.TransTestLib.count ;
 
-import java.util.Date ;
 import java.util.concurrent.Callable ;
 import java.util.concurrent.ExecutorService ;
 import java.util.concurrent.Executors ;
 import java.util.concurrent.TimeUnit ;
 import java.util.concurrent.atomic.AtomicInteger ;
 
+import org.junit.AfterClass ;
+import org.junit.BeforeClass ;
 import org.openjena.atlas.lib.FileOps ;
 import org.openjena.atlas.lib.Lib ;
-import org.openjena.atlas.lib.RandomLib ;
 
 import com.hp.hpl.jena.datatypes.xsd.XSDDatatype ;
 import com.hp.hpl.jena.graph.Node ;
 import com.hp.hpl.jena.sparql.core.Quad ;
 import com.hp.hpl.jena.sparql.sse.SSE ;
-import com.hp.hpl.jena.tdb.ConfigTest ;
 import com.hp.hpl.jena.tdb.DatasetGraphTxn ;
 import com.hp.hpl.jena.tdb.ReadWrite ;
 import com.hp.hpl.jena.tdb.StoreConnection ;
+import com.hp.hpl.jena.tdb.base.file.Location ;
 
 /** System testing of the transactions. */
 public class TestTransSystem
 {
-    static ExecutorService execService = Executors.newCachedThreadPool() ;
+    public static void main(String...args)
+    {
+        final int N = 100 ;
+        int i ;
+        for ( i = 0 ; i < 1000 ; i++ )
+        {
+            if ( i%N == 0 )
+                System.out.printf("%03d: ",i) ;
+            System.out.print(".") ;
+            if ( i%N == (N-1) )
+                System.out.println() ;
+            new TestTransSystem().manyReaderAndOneWriter() ;
+        }
+        if ( i%N != 0 )
+            System.out.println() ;
+        System.out.println() ;
+        System.out.printf("DONE (%03d)\n",i) ;
+    }
+    
+    private ExecutorService execService = Executors.newCachedThreadPool() ;
     static Quad q  = SSE.parseQuad("(_ <s> <p> <o>) ") ;
     static Quad q1 = SSE.parseQuad("(_ <s> <p> <o1>)") ;
     static Quad q2 = SSE.parseQuad("(_ <s> <p> <o2>)") ;
     static Quad q3 = SSE.parseQuad("(_ <s> <p> <o3>)") ;
     static Quad q4 = SSE.parseQuad("(_ <s> <p> <o4>)") ;
     
-    static final String DIR = ConfigTest.getTestingDirDB() ;
-    private StoreConnection sConn ;
-    private int initCount = -1 ;
+    static final Location LOC = Location.mem() ; // new Location(ConfigTest.getTestingDirDB()) ;
     static final AtomicInteger gen = new AtomicInteger() ;
-    protected StoreConnection getStoreConnection()
+    
+    @BeforeClass 
+    public static void beforeClass()
     {
-        FileOps.clearDirectory(DIR) ;
-        StoreConnection sConn = StoreConnection.make(DIR) ;
+        if ( ! LOC.isMem() )
+            FileOps.clearDirectory(LOC.getDirectoryPath()) ;
+        StoreConnection.reset() ;
+        StoreConnection sConn = StoreConnection.make(LOC) ;
         DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ;
         dsg.add(q1) ;
         dsg.add(q2) ;
         initCount = 2 ;
         dsg.commit() ;
         dsg.close() ;
-        return sConn ;
     }
     
-    public static void main(String...args)
+    @AfterClass 
+    public static void afterClass() {}
+
+    private StoreConnection sConn ;
+    private static int initCount = -1 ;
+
+    protected synchronized StoreConnection getStoreConnection()
     {
-        System.out.println("Starting: "+new Date()) ;
-        new TestTransSystem().manyReaderAndOneWriter() ;
-        System.out.println("Stopping: "+new Date()) ;
-        
+        return StoreConnection.make(LOC) ;
     }
     
+    public TestTransSystem() {}
+    
     //@Test
     public void manyRead()
     {
         final StoreConnection sConn = getStoreConnection() ;
-        Callable<?> proc = new Reader(sConn, 50, 200)  ;
-
+        Callable<?> proc = new Reader(sConn, 50, 200)  ;        // Number of repeats, max pause
             
-        for ( int i = 0 ; i < 50 ; i++ )
+        for ( int i = 0 ; i < 5 ; i++ )
             execService.submit(proc) ;
         try
         {
@@ -94,36 +118,32 @@ public class TestTransSystem
         }
     }
     
-    
-    
     //@Test
     public void manyReaderAndOneWriter()
     {
+        final int numOfTasks = 10 ;
         final StoreConnection sConn = getStoreConnection() ;
         
-        Callable<?> procR = new Reader(sConn, 5, 200) ;
-        Callable<?> procW = new Writer(sConn, 5, 100, true) {
+        Callable<?> procR = new Reader(sConn, 10, 50) ;      // Number of repeats, max pause
+        Callable<?> procW_a = new Writer(sConn, 1, 10, false)  // Number of repeats, max pause, commit. 
+        {
             @Override
             protected int change(DatasetGraphTxn dsg, int id, int i)
-            {
-                int count = 0 ;
-                int N = 5 ;
-                for ( int j = 0 ; j < N; j++ )
-                {
-                    Quad q = genQuad(N*id+j) ;
-                    if ( ! dsg.contains(q) )
-                    {
-                        dsg.add(q) ;
-                        count++ ;
-                    }
-                }
-                return count ;
-            } } ;
+            { return changeProc(dsg, id, i) ; }
+        } ;
             
-        for ( int i = 0 ; i < 500 ; i++ )
+        Callable<?> procW_c = new Writer(sConn, 5, 10, true)  // Number of repeats, max pause, commit. 
+        {
+            @Override
+            protected int change(DatasetGraphTxn dsg, int id, int i)
+            { return changeProc(dsg, id, i) ; }
+        } ;
+
+        for ( int i = 0 ; i < numOfTasks ; i++ )
         {
             execService.submit(procR) ;   
-            execService.submit(procW) ;
+            execService.submit(procW_a) ;
+            execService.submit(procW_c) ;
         }
         try
         {
@@ -135,15 +155,31 @@ public class TestTransSystem
         } 
     }
 
+    static int changeProc(DatasetGraphTxn dsg, int id, int i)
+    {
+        int count = 0 ;
+        int N = 5 ;
+        for ( int j = 0 ; j < N; j++ )
+        {
+            Quad q = genQuad(id+j) ;
+            if ( ! dsg.contains(q) )
+            {
+                dsg.add(q) ;
+                count++ ;
+            }
+        }
+        return count ;
+    }
+    
     static class Reader implements Callable<Object>
     {
         private final int repeats ;
         private final int maxpause ;
         private final StoreConnection sConn ; 
     
-        Reader(StoreConnection sConn, int number, int pause)
+        Reader(StoreConnection sConn, int numSeqRepeats, int pause)
         {
-            this.repeats = number ;
+            this.repeats = numSeqRepeats ;
             this.maxpause = pause ;
             this.sConn = sConn ;
         }
@@ -173,9 +209,9 @@ public class TestTransSystem
         private final StoreConnection sConn ;
         private final boolean commit ; 
     
-        protected Writer(StoreConnection sConn, int number, int pause, boolean commit)
+        protected Writer(StoreConnection sConn, int numSeqRepeats, int pause, boolean commit)
         {
-            this.repeats = number ;
+            this.repeats = numSeqRepeats ;
             this.maxpause = pause ;
             this.sConn = sConn ;
             this.commit = commit ;
@@ -185,7 +221,7 @@ public class TestTransSystem
         public Object call()
         {
             int id = gen.incrementAndGet() ;
-            for ( int i = 0 ; i < 10; i++ )
+            for ( int i = 0 ; i < repeats ; i++ )
             {
                 DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ;
                 int x1 = count("SELECT * { ?s ?p ?o }", dsg) ;