You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2011/07/26 21:20:31 UTC
svn commit: r1151217 - in /incubator/jena/Experimental/TxTDB/trunk:
src-dev/tx/ src/main/java/com/hp/hpl/jena/tdb/
src/main/java/com/hp/hpl/jena/tdb/transaction/
src/test/java/com/hp/hpl/jena/tdb/transaction/
Author: andy
Date: Tue Jul 26 19:20:29 2011
New Revision: 1151217
URL: http://svn.apache.org/viewvc?rev=1151217&view=rev
Log:
Conncurrency fixes.
Multithreaded reader/writer tests.
Added:
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionMBean.java (with props)
Modified:
incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java
Modified: incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java?rev=1151217&r1=1151216&r2=1151217&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java Tue Jul 26 19:20:29 2011
@@ -19,6 +19,7 @@ public class DevTx
// * Dataset API
// * UUID per committed version to support etags
// * Promote => duplicate even when not necessary. BlockMgr property.
+ // * Monitoring and stats : JMX.
// Tidy up:
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java?rev=1151217&r1=1151216&r2=1151217&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java Tue Jul 26 19:20:29 2011
@@ -64,7 +64,7 @@ public class StoreConnection
private static Map<Location, StoreConnection> cache = new HashMap<Location, StoreConnection>() ;
- public static void reset()
+ public static synchronized void reset()
{
for ( Map.Entry<Location, StoreConnection> e : cache.entrySet() )
e.getValue().baseDSG.close() ;
@@ -72,7 +72,7 @@ public class StoreConnection
cache.clear() ;
}
- public static StoreConnection make(Location location)
+ public static synchronized StoreConnection make(Location location)
{
TDBMaker.releaseLocation(location) ;
StoreConnection sConn = cache.get(location) ;
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java?rev=1151217&r1=1151216&r2=1151217&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java Tue Jul 26 19:20:29 2011
@@ -203,7 +203,8 @@ public class BlockMgrJournal implements
@Override
public void overwrite(Block block)
{
- write(block) ;
+ // We are in a chain of BlockMgrs - pass down to the base.
+ blockMgr.overwrite(block) ;
}
@Override
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java?rev=1151217&r1=1151216&r2=1151217&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java Tue Jul 26 19:20:29 2011
@@ -177,6 +177,7 @@ public class JournalControl
BlockMgr blkMgr = mgrs.get(e.getFileRef()) ;
Block blk = e.getBlock() ;
+ blk.setModified(true) ;
blkMgr.overwrite(blk) ;
// Block blk = blkMgr.getWrite(e.getBlock().getId()) ;
Added: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionMBean.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionMBean.java?rev=1151217&view=auto
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionMBean.java (added)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionMBean.java Tue Jul 26 19:20:29 2011
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.tdb.transaction;
+
+/** Interface to stats for a single Transaction Manager */
+public interface TransactionMBean
+{
+ /** Number of transactions executed */
+ long getTransactionCount() ;
+
+ /** Number of read transactions executed */
+ long getReadTransactionCount() ;
+
+ /** Number of write transactions executed */
+ long getWriteTransactionCount() ;
+
+ /** Number of write transactions that aborted */
+ long getWriteAbortTransactionCount() ;
+
+ /** Number of write transactions that committed */
+ long getWriteCommitTransactionCount() ;
+
+ /** Number of write transactions that have committed but are not written to the base database */
+ long getWriteCommitTransactionPendingCount() ;
+
+ /** Number of write transactions executing */
+ long getCurrentWriteTransactionCount() ;
+
+ /** Number of read transactions executing */
+ long getCurrentReadTransactionCount() ;
+}
+
Propchange: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionMBean.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java?rev=1151217&r1=1151216&r2=1151217&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java Tue Jul 26 19:20:29 2011
@@ -44,7 +44,7 @@ public class TransactionManager
static long transactionId = 1 ;
private int readers = 0 ;
- private int writers = 0 ; // 0 or 1
+ private int writers = 0 ; // 0 or 1
// Misc stats
private int finishedReads = 0 ;
@@ -99,9 +99,9 @@ public class TransactionManager
{
case READ : readers++ ; break ;
case WRITE :
- if ( writers > 0 )
- throw new TDBTransactionException("Existing active transaction") ;
- writers ++ ;
+ int x = writers++ ;
+ if ( x > 0 )
+ throw new TDBTransactionException("Existing active write transaction") ;
break ;
}
@@ -173,7 +173,6 @@ public class TransactionManager
{
log("abort", transaction) ;
// Transaction has done the abort on all the transactional elements.
- // TODO Suppose the system journal has
if ( ! activeTransactions.contains(transaction) )
SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ;
endTransaction(transaction) ;
Modified: incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java?rev=1151217&r1=1151216&r2=1151217&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java Tue Jul 26 19:20:29 2011
@@ -20,69 +20,93 @@ package com.hp.hpl.jena.tdb.transaction;
import static com.hp.hpl.jena.tdb.transaction.TransTestLib.count ;
-import java.util.Date ;
import java.util.concurrent.Callable ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicInteger ;
+import org.junit.AfterClass ;
+import org.junit.BeforeClass ;
import org.openjena.atlas.lib.FileOps ;
import org.openjena.atlas.lib.Lib ;
-import org.openjena.atlas.lib.RandomLib ;
import com.hp.hpl.jena.datatypes.xsd.XSDDatatype ;
import com.hp.hpl.jena.graph.Node ;
import com.hp.hpl.jena.sparql.core.Quad ;
import com.hp.hpl.jena.sparql.sse.SSE ;
-import com.hp.hpl.jena.tdb.ConfigTest ;
import com.hp.hpl.jena.tdb.DatasetGraphTxn ;
import com.hp.hpl.jena.tdb.ReadWrite ;
import com.hp.hpl.jena.tdb.StoreConnection ;
+import com.hp.hpl.jena.tdb.base.file.Location ;
/** System testing of the transactions. */
public class TestTransSystem
{
- static ExecutorService execService = Executors.newCachedThreadPool() ;
+ public static void main(String...args)
+ {
+ final int N = 100 ;
+ int i ;
+ for ( i = 0 ; i < 1000 ; i++ )
+ {
+ if ( i%N == 0 )
+ System.out.printf("%03d: ",i) ;
+ System.out.print(".") ;
+ if ( i%N == (N-1) )
+ System.out.println() ;
+ new TestTransSystem().manyReaderAndOneWriter() ;
+ }
+ if ( i%N != 0 )
+ System.out.println() ;
+ System.out.println() ;
+ System.out.printf("DONE (%03d)\n",i) ;
+ }
+
+ private ExecutorService execService = Executors.newCachedThreadPool() ;
static Quad q = SSE.parseQuad("(_ <s> <p> <o>) ") ;
static Quad q1 = SSE.parseQuad("(_ <s> <p> <o1>)") ;
static Quad q2 = SSE.parseQuad("(_ <s> <p> <o2>)") ;
static Quad q3 = SSE.parseQuad("(_ <s> <p> <o3>)") ;
static Quad q4 = SSE.parseQuad("(_ <s> <p> <o4>)") ;
- static final String DIR = ConfigTest.getTestingDirDB() ;
- private StoreConnection sConn ;
- private int initCount = -1 ;
+ static final Location LOC = Location.mem() ; // new Location(ConfigTest.getTestingDirDB()) ;
static final AtomicInteger gen = new AtomicInteger() ;
- protected StoreConnection getStoreConnection()
+
+ @BeforeClass
+ public static void beforeClass()
{
- FileOps.clearDirectory(DIR) ;
- StoreConnection sConn = StoreConnection.make(DIR) ;
+ if ( ! LOC.isMem() )
+ FileOps.clearDirectory(LOC.getDirectoryPath()) ;
+ StoreConnection.reset() ;
+ StoreConnection sConn = StoreConnection.make(LOC) ;
DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ;
dsg.add(q1) ;
dsg.add(q2) ;
initCount = 2 ;
dsg.commit() ;
dsg.close() ;
- return sConn ;
}
- public static void main(String...args)
+ @AfterClass
+ public static void afterClass() {}
+
+ private StoreConnection sConn ;
+ private static int initCount = -1 ;
+
+ protected synchronized StoreConnection getStoreConnection()
{
- System.out.println("Starting: "+new Date()) ;
- new TestTransSystem().manyReaderAndOneWriter() ;
- System.out.println("Stopping: "+new Date()) ;
-
+ return StoreConnection.make(LOC) ;
}
+ public TestTransSystem() {}
+
//@Test
public void manyRead()
{
final StoreConnection sConn = getStoreConnection() ;
- Callable<?> proc = new Reader(sConn, 50, 200) ;
-
+ Callable<?> proc = new Reader(sConn, 50, 200) ; // Number of repeats, max pause
- for ( int i = 0 ; i < 50 ; i++ )
+ for ( int i = 0 ; i < 5 ; i++ )
execService.submit(proc) ;
try
{
@@ -94,36 +118,32 @@ public class TestTransSystem
}
}
-
-
//@Test
public void manyReaderAndOneWriter()
{
+ final int numOfTasks = 10 ;
final StoreConnection sConn = getStoreConnection() ;
- Callable<?> procR = new Reader(sConn, 5, 200) ;
- Callable<?> procW = new Writer(sConn, 5, 100, true) {
+ Callable<?> procR = new Reader(sConn, 10, 50) ; // Number of repeats, max pause
+ Callable<?> procW_a = new Writer(sConn, 1, 10, false) // Number of repeats, max pause, commit.
+ {
@Override
protected int change(DatasetGraphTxn dsg, int id, int i)
- {
- int count = 0 ;
- int N = 5 ;
- for ( int j = 0 ; j < N; j++ )
- {
- Quad q = genQuad(N*id+j) ;
- if ( ! dsg.contains(q) )
- {
- dsg.add(q) ;
- count++ ;
- }
- }
- return count ;
- } } ;
+ { return changeProc(dsg, id, i) ; }
+ } ;
- for ( int i = 0 ; i < 500 ; i++ )
+ Callable<?> procW_c = new Writer(sConn, 5, 10, true) // Number of repeats, max pause, commit.
+ {
+ @Override
+ protected int change(DatasetGraphTxn dsg, int id, int i)
+ { return changeProc(dsg, id, i) ; }
+ } ;
+
+ for ( int i = 0 ; i < numOfTasks ; i++ )
{
execService.submit(procR) ;
- execService.submit(procW) ;
+ execService.submit(procW_a) ;
+ execService.submit(procW_c) ;
}
try
{
@@ -135,15 +155,31 @@ public class TestTransSystem
}
}
+ static int changeProc(DatasetGraphTxn dsg, int id, int i)
+ {
+ int count = 0 ;
+ int N = 5 ;
+ for ( int j = 0 ; j < N; j++ )
+ {
+ Quad q = genQuad(id+j) ;
+ if ( ! dsg.contains(q) )
+ {
+ dsg.add(q) ;
+ count++ ;
+ }
+ }
+ return count ;
+ }
+
static class Reader implements Callable<Object>
{
private final int repeats ;
private final int maxpause ;
private final StoreConnection sConn ;
- Reader(StoreConnection sConn, int number, int pause)
+ Reader(StoreConnection sConn, int numSeqRepeats, int pause)
{
- this.repeats = number ;
+ this.repeats = numSeqRepeats ;
this.maxpause = pause ;
this.sConn = sConn ;
}
@@ -173,9 +209,9 @@ public class TestTransSystem
private final StoreConnection sConn ;
private final boolean commit ;
- protected Writer(StoreConnection sConn, int number, int pause, boolean commit)
+ protected Writer(StoreConnection sConn, int numSeqRepeats, int pause, boolean commit)
{
- this.repeats = number ;
+ this.repeats = numSeqRepeats ;
this.maxpause = pause ;
this.sConn = sConn ;
this.commit = commit ;
@@ -185,7 +221,7 @@ public class TestTransSystem
public Object call()
{
int id = gen.incrementAndGet() ;
- for ( int i = 0 ; i < 10; i++ )
+ for ( int i = 0 ; i < repeats ; i++ )
{
DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ;
int x1 = count("SELECT * { ?s ?p ?o }", dsg) ;