You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2016/08/10 20:14:18 UTC
jena git commit: Make code layout more conistent.
Repository: jena
Updated Branches:
refs/heads/master a71367bc7 -> ce3c8f9aa
Make code layout more conistent.
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/ce3c8f9a
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/ce3c8f9a
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/ce3c8f9a
Branch: refs/heads/master
Commit: ce3c8f9aaed0f3123767502ce430173258e5bf69
Parents: a71367b
Author: Andy Seaborne <an...@apache.org>
Authored: Wed Aug 10 19:31:58 2016 +0100
Committer: Andy Seaborne <an...@apache.org>
Committed: Wed Aug 10 19:31:58 2016 +0100
----------------------------------------------------------------------
.../tdb/transaction/TransactionManager.java | 296 ++++++-------------
1 file changed, 98 insertions(+), 198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/ce3c8f9a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
----------------------------------------------------------------------
diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java b/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
index b591b51..db091e7 100644
--- a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
+++ b/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
@@ -67,9 +67,8 @@ public class TransactionManager
// When improved, rename to chase down any systems directly setting it.
public static /*final*/ int QueueBatchSize = setQueueBatchSize() ;
- private static int setQueueBatchSize()
- {
- if ( SystemTDB.is64bitSystem )
+ private static int setQueueBatchSize() {
+ if ( SystemTDB.is64bitSystem )
return 10 ;
// On 32bit systems are memory constrained. The Java address space is
// limited to about 1.5G - the heap can not be bigger.
@@ -81,9 +80,9 @@ public class TransactionManager
enum TxnPoint { BEGIN, COMMIT, ABORT, CLOSE, QUEUE, UNQUEUE }
private List<Pair<Transaction, TxnPoint>> transactionStateTransition ;
- private void record(Transaction txn, TxnPoint state)
- {
- if ( ! recordHistory ) return ;
+ private void record(Transaction txn, TxnPoint state) {
+ if ( !recordHistory )
+ return ;
initRecordingState() ;
transactionStateTransition.add(new Pair<>(txn, state)) ;
}
@@ -138,8 +137,7 @@ public class TransactionManager
* 5/ transactionCloses
*/
- private interface TSM
- {
+ private interface TSM {
void transactionStarts(Transaction txn) ;
void transactionFinishes(Transaction txn) ;
void transactionCloses(Transaction txn) ;
@@ -150,8 +148,7 @@ public class TransactionManager
void writerAborts(Transaction txn) ;
}
- class TSM_Base implements TSM
- {
+ class TSM_Base implements TSM {
@Override public void transactionStarts(Transaction txn) {}
@Override public void transactionFinishes(Transaction txn) {}
@Override public void transactionCloses(Transaction txn) {}
@@ -162,8 +159,7 @@ public class TransactionManager
@Override public void writerAborts(Transaction txn) {}
}
- class TSM_Logger extends TSM_Base
- {
+ class TSM_Logger extends TSM_Base {
TSM_Logger() {}
@Override public void readerStarts(Transaction txn) { log("start", txn) ; }
@Override public void readerFinishes(Transaction txn) { log("finish", txn) ; }
@@ -173,8 +169,7 @@ public class TransactionManager
}
/** More detailed */
- class TSM_LoggerDebug extends TSM_Base
- {
+ class TSM_LoggerDebug extends TSM_Base {
TSM_LoggerDebug() {}
@Override public void readerStarts(Transaction txn) { logInternal("start", txn) ; }
@Override public void readerFinishes(Transaction txn) { logInternal("finish", txn) ; }
@@ -182,11 +177,9 @@ public class TransactionManager
@Override public void writerCommits(Transaction txn) { logInternal("commit", txn) ; }
@Override public void writerAborts(Transaction txn) { logInternal("abort", txn) ; }
}
-
// Mixes stats and state variables :-(
- class TSM_Counters implements TSM
- {
+ class TSM_Counters implements TSM {
TSM_Counters() {}
@Override public void transactionStarts(Transaction txn) { activeTransactions.add(txn) ; }
@Override public void transactionFinishes(Transaction txn) { activeTransactions.remove(txn) ; }
@@ -209,8 +202,7 @@ public class TransactionManager
// Policy for writing back journal'ed data to the base datasetgraph
// Writes if no reader at end of writer, else queues.
// Queue cleared at en dof any transaction finding itself the only transaction.
- class TSM_WriteBackEndTxn extends TSM_Base
- {
+ class TSM_WriteBackEndTxn extends TSM_Base {
// Safe mode.
// Take a READ lock over the base dataset.
// Write-back takes a WRITE lock.
@@ -220,22 +212,22 @@ public class TransactionManager
// Currently, the writer semaphore is managed explicitly in the main code.
- @Override public void readerFinishes(Transaction txn)
- {
+ @Override
+ public void readerFinishes(Transaction txn) {
txn.getBaseDataset().getLock().leaveCriticalSection() ;
readerFinishesWorker(txn) ;
}
- @Override public void writerCommits(Transaction txn)
- {
+ @Override
+ public void writerCommits(Transaction txn) {
txn.getBaseDataset().getLock().leaveCriticalSection() ;
writerCommitsWorker(txn) ;
}
-
- @Override public void writerAborts(Transaction txn)
- {
+
+ @Override
+ public void writerAborts(Transaction txn) {
txn.getBaseDataset().getLock().leaveCriticalSection() ;
- writerAbortsWorker(txn) ;
+ writerAbortsWorker(txn) ;
}
}
@@ -256,8 +248,7 @@ public class TransactionManager
}
- class TSM_Record extends TSM_Base
- {
+ class TSM_Record extends TSM_Base {
// Later - record on one list the state transition.
@Override
public void transactionStarts(Transaction txn) { record(txn, BEGIN) ; }
@@ -272,9 +263,8 @@ public class TransactionManager
(recordHistory ? new TSM_Record() : null ) ,
new TSM_WriteBackEndTxn() // Write back policy. Must be last.
} ;
-
- public TransactionManager(DatasetGraphTDB dsg)
- {
+
+ public TransactionManager(DatasetGraphTDB dsg){
this.baseDataset = dsg ;
this.journal = Journal.create(dsg.getLocation()) ;
// LATER
@@ -284,55 +274,18 @@ public class TransactionManager
// committerThread.start() ;
}
- public void closedown()
- {
+ public void closedown() {
processDelayedReplayQueue(null) ;
journal.close() ;
}
- public DatasetGraphTxn begin(ReadWrite mode)
- {
+ public DatasetGraphTxn begin(ReadWrite mode) {
return begin(mode, null) ;
}
public /*for testing only*/ static final boolean DEBUG = false ;
- /** Control logging - the logger must be set as well */
- //public /*for testing only*/ static boolean LOG = false ;
-
-// public Transaction begin(ReadWrite readWrite, boolean canBlock) {
-// Objects.nonNull(readWrite) ;
-// checkActive() ;
-//
-// if ( canBlock )
-// exclusivitylock.readLock().lock() ;
-// else {
-// if ( ! exclusivitylock.readLock().tryLock() )
-// return null ;
-// }
-//
-// // Readers never block.
-// if ( readWrite == WRITE ) {
-// // Writers take a WRITE permit from the semaphore to ensure there
-// // is at most one active writer, else the attempt to start the
-// // transaction blocks.
-// // Released by in notifyCommitFinish/notifyAbortFinish
-// boolean b = acquireWriterLock(canBlock) ;
-// if ( !b ) {
-// exclusivitylock.readLock().unlock() ;
-// return null ;
-// }
-// }
-// Transaction transaction = begin$(readWrite) ;
-// startActiveTransaction(transaction) ;
-// transaction.begin();
-// return transaction;
-//}
-
-
-
- public DatasetGraphTxn begin(ReadWrite mode, String label)
- {
+ public DatasetGraphTxn begin(ReadWrite mode, String label) {
exclusivitylock.readLock().lock() ;
// Not synchronized (else blocking on semaphore will never wake up
@@ -353,8 +306,7 @@ public class TransactionManager
// of the low level object directly so we'll play safe.
synchronized
- private DatasetGraphTxn begin$(ReadWrite mode, String label)
- {
+ private DatasetGraphTxn begin$(ReadWrite mode, String label) {
if ( mode == ReadWrite.WRITE && activeWriters.get() > 0 ) // Guard
throw new TDBTransactionException("Existing active write transaction") ;
@@ -367,14 +319,13 @@ public class TransactionManager
DatasetGraphTDB dsg = baseDataset ;
// *** But, if there are pending, committed transactions, use latest.
- if ( ! commitedAwaitingFlush.isEmpty() )
- {
- if ( DEBUG ) System.out.print(commitedAwaitingFlush.size()) ;
- dsg = commitedAwaitingFlush.get(commitedAwaitingFlush.size()-1).getActiveDataset().getView() ;
- }
- else
- {
- if ( DEBUG ) System.out.print('_') ;
+ if ( !commitedAwaitingFlush.isEmpty() ) {
+ if ( DEBUG )
+ System.out.print(commitedAwaitingFlush.size()) ;
+ dsg = commitedAwaitingFlush.get(commitedAwaitingFlush.size() - 1).getActiveDataset().getView() ;
+ } else {
+ if ( DEBUG )
+ System.out.print('_') ;
}
Transaction txn = createTransaction(dsg, mode, label) ;
@@ -387,9 +338,8 @@ public class TransactionManager
// Empty for READ ; only WRITE transactions have components that need notifiying.
List<TransactionLifecycle> components = dsgTxn.getTransaction().lifecycleComponents() ;
- if ( mode == ReadWrite.READ )
- {
- // ---- Consistency check. View caching does not reset components.
+ if ( mode == ReadWrite.READ ) {
+ // ---- Consistency check. View caching does not reset components.
if ( components.size() != 0 )
log.warn("read transaction, non-empty lifecycleComponents list") ;
}
@@ -401,24 +351,20 @@ public class TransactionManager
return dsgTxn ;
}
- private Transaction createTransaction(DatasetGraphTDB dsg, ReadWrite mode, String label)
- {
+ private Transaction createTransaction(DatasetGraphTDB dsg, ReadWrite mode, String label) {
Transaction txn = new Transaction(dsg, mode, transactionId.getAndIncrement(), label, this) ;
return txn ;
}
- private DatasetGraphTxn createDSGTxn(DatasetGraphTDB dsg, Transaction txn, ReadWrite mode)
- {
+ private DatasetGraphTxn createDSGTxn(DatasetGraphTDB dsg, Transaction txn, ReadWrite mode) {
// A read transaction (if it has no lifecycle components) can be shared over all
// read transactions at the same commit level.
// lastreader
- if ( mode == ReadWrite.READ )
- {
+ if ( mode == ReadWrite.READ ) {
// If a READ transaction, and a previously built one is cached, use it.
- DatasetGraphTDB dsgCached = currentReaderView.get();
- if ( dsgCached != null )
- {
+ DatasetGraphTDB dsgCached = currentReaderView.get() ;
+ if ( dsgCached != null ) {
// No components so we don't need to notify them.
// We can just reuse the storage dataset.
return new DatasetGraphTxn(dsgCached, txn) ;
@@ -426,11 +372,10 @@ public class TransactionManager
}
DatasetGraphTxn dsgTxn = new DatasetBuilderTxn(this).build(txn, mode, dsg) ;
- if ( mode == ReadWrite.READ )
- {
+ if ( mode == ReadWrite.READ ) {
// If a READ transaction, cache the storage view.
// This is cleared when a WRITE commits
- currentReaderView.set(dsgTxn.getView());
+ currentReaderView.set(dsgTxn.getView()) ;
}
return dsgTxn ;
}
@@ -441,15 +386,13 @@ public class TransactionManager
* together with general recording of transaction details and status.
*/
synchronized
- public void notifyCommit(Transaction transaction)
- {
+ public void notifyCommit(Transaction transaction) {
if ( ! activeTransactions.contains(transaction) )
SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ;
noteTxnCommit(transaction) ;
- switch ( transaction.getMode() )
- {
+ switch ( transaction.getMode() ) {
case READ: break ;
case WRITE:
currentReaderView.set(null) ; // Clear the READ transaction cache.
@@ -458,8 +401,7 @@ public class TransactionManager
}
synchronized
- public void notifyAbort(Transaction transaction)
- {
+ public void notifyAbort(Transaction transaction) {
// Transaction has done the abort on all the transactional elements.
if ( ! activeTransactions.contains(transaction) )
SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ;
@@ -588,8 +530,7 @@ public class TransactionManager
/** Try to flush the delayed write queue - only happens if there are no active transactions */
synchronized
- public void flush()
- {
+ public void flush() {
processDelayedReplayQueue(null) ;
}
@@ -597,43 +538,37 @@ public class TransactionManager
// Called from TSM_WriteBackEndTxn but the worker code is shere so all
// related code, including queue flushing is close together.
- private void readerFinishesWorker(Transaction txn)
- {
+ private void readerFinishesWorker(Transaction txn) {
if ( queue.size() >= QueueBatchSize )
processDelayedReplayQueue(txn) ;
}
-
- private void writerAbortsWorker(Transaction txn)
- {
+
+ private void writerAbortsWorker(Transaction txn) {
if ( queue.size() >= QueueBatchSize )
processDelayedReplayQueue(txn) ;
}
- private void writerCommitsWorker(Transaction txn)
- {
- if ( activeReaders.get() == 0 && queue.size() >= QueueBatchSize )
- {
+ private void writerCommitsWorker(Transaction txn) {
+ if ( activeReaders.get() == 0 && queue.size() >= QueueBatchSize ) {
// Can commit immediately.
// Ensure the queue is empty though.
- // Could simply add txn to the commit queue and do it that way.
- if ( log() ) log("Commit immediately", txn) ;
-
- // Currently, all we need is
+ // Could simply add txn to the commit queue and do it that way.
+ if ( log() ) log("Commit immediately", txn) ;
+
+ // Currently, all we need is
// JournalControl.replay(txn) ;
// because that plays queued transactions.
// But for long term generallity, at the cost of one check of the journal size
// we do this sequence.
-
+
processDelayedReplayQueue(txn) ;
enactTransaction(txn) ;
JournalControl.replay(txn) ;
- }
- else
- {
+ } else {
// Can't write back to the base database at the moment.
commitedAwaitingFlush.add(txn) ;
maxQueue = Math.max(commitedAwaitingFlush.size(), maxQueue) ;
- if ( log() ) log("Add to pending queue", txn) ;
+ if ( log() ) log("Add to pending queue", txn) ;
queue.add(txn) ;
}
@@ -642,15 +577,14 @@ public class TransactionManager
private void processDelayedReplayQueue(Transaction txn)
{
// Can we do work?
- if ( activeReaders.get() != 0 || activeWriters.get() != 0 )
- {
+
+ if ( activeReaders.get() != 0 || activeWriters.get() != 0 ) {
if ( queue.size() > 0 && log() )
log(format("Pending transactions: R=%s / W=%s", activeReaders, activeWriters), txn) ;
return ;
}
- if ( DEBUG )
- {
+ if ( DEBUG ) {
if ( queue.size() > 0 )
System.out.print("!"+queue.size()+"!") ;
}
@@ -668,12 +602,11 @@ public class TransactionManager
// against the updated database.
currentReaderView.set(null) ;
- while ( queue.size() > 0 )
- {
+ while (queue.size() > 0) {
// Currently, replay is replay everything
// so looping on a per-transaction basis is
- // pointless but harmless.
-
+ // pointless but harmless.
+
try {
Transaction txn2 = queue.take() ;
if ( txn2.getMode() == ReadWrite.READ )
@@ -699,32 +632,25 @@ public class TransactionManager
checkReplaySafe() ;
if ( log() )
log("End flush delayed commits", txn) ;
-
-
-
}
- private void checkNodesDatJrnl(String label, Transaction txn)
- {
- if (txn != null)
- {
- String x = txn.getBaseDataset().getLocation().getPath(label+": nodes.dat-jrnl") ;
+ private void checkNodesDatJrnl(String label, Transaction txn) {
+ if ( txn != null ) {
+ String x = txn.getBaseDataset().getLocation().getPath(label + ": nodes.dat-jrnl") ;
long len = new File(x).length() ;
- if (len != 0)
+ if ( len != 0 )
log("nodes.dat-jrnl: not empty", txn) ;
- }
+ }
}
- private void checkReplaySafe()
- {
+ private void checkReplaySafe() {
if ( ! checking ) return ;
if ( activeReaders.get() != 0 || activeWriters.get() != 0 )
log.error("There are now active transactions") ;
}
synchronized
- public void notifyClose(Transaction txn)
- {
+ public void notifyClose(Transaction txn) {
if ( txn.getState() == TxnState.ACTIVE )
{
String x = txn.getBaseDataset().getLocation().getDirectoryPath() ;
@@ -737,8 +663,7 @@ public class TransactionManager
noteTxnClose(txn) ;
}
- private void noteStartTxn(Transaction transaction)
- {
+ private void noteStartTxn(Transaction transaction) {
switch (transaction.getMode())
{
case READ : readerStarts(transaction) ; break ;
@@ -747,8 +672,7 @@ public class TransactionManager
transactionStarts(transaction) ;
}
- private void noteTxnCommit(Transaction transaction)
- {
+ private void noteTxnCommit(Transaction transaction) {
switch (transaction.getMode())
{
case READ : readerFinishes(transaction) ; break ;
@@ -758,8 +682,7 @@ public class TransactionManager
exclusivitylock.readLock().unlock() ;
}
- private void noteTxnAbort(Transaction transaction)
- {
+ private void noteTxnAbort(Transaction transaction) {
switch (transaction.getMode())
{
case READ : readerFinishes(transaction) ; break ;
@@ -769,8 +692,7 @@ public class TransactionManager
exclusivitylock.readLock().unlock() ;
}
- private void noteTxnClose(Transaction transaction)
- {
+ private void noteTxnClose(Transaction transaction) {
transactionCloses(transaction) ;
}
@@ -779,27 +701,23 @@ public class TransactionManager
/** Get recording state */
public boolean recording() { return recordHistory ; }
/** Set recording on or off */
- public void recording(boolean flag)
- {
+ public void recording(boolean flag) {
recordHistory = flag ;
if ( recordHistory )
initRecordingState() ;
}
/** Clear all recording state - does not clear stats */
- public void clearRecordingState()
- {
+ public void clearRecordingState() {
initRecordingState() ;
transactionStateTransition.clear() ;
}
- private void initRecordingState()
- {
+ private void initRecordingState() {
if ( transactionStateTransition == null )
transactionStateTransition = new ArrayList<>() ;
}
- public Journal getJournal()
- {
+ public Journal getJournal() {
return journal ;
}
@@ -808,13 +726,11 @@ public class TransactionManager
private final boolean logstate = (syslog.isDebugEnabled() || log.isDebugEnabled()) ;
- private boolean log()
- {
+ private boolean log() {
return logstate ;
}
- private void log(String msg, Transaction txn)
- {
+ private void log(String msg, Transaction txn) {
if ( ! log() )
return ;
if ( txn == null )
@@ -823,8 +739,7 @@ public class TransactionManager
logger().debug(txn.getLabel()+": "+msg) ;
}
- private void logInternal(String action, Transaction txn)
- {
+ private void logInternal(String action, Transaction txn) {
if ( ! log() )
return ;
String txnStr = ( txn == null ) ? "<null>" : txn.getLabel() ;
@@ -832,8 +747,7 @@ public class TransactionManager
logger().debug(format("%6s %s -- %s", action, txnStr, state())) ;
}
- private static Logger logger()
- {
+ private static Logger logger() {
if ( syslog.isDebugEnabled() )
return syslog ;
else
@@ -841,19 +755,15 @@ public class TransactionManager
}
synchronized
- public SysTxnState state()
- {
+ public SysTxnState state() {
return new SysTxnState(this) ;
}
// LATER.
- class Committer implements Runnable
- {
+ class Committer implements Runnable {
@Override
- public void run()
- {
- for(;;)
- {
+ public void run() {
+ for ( ;; ) {
// Wait until the reader count goes to zero.
// This wakes up for every transation but maybe
@@ -862,68 +772,58 @@ public class TransactionManager
Transaction txn = queue.take() ;
// This takes a Write lock on the DSG - this is where it blocks.
JournalControl.replay(txn) ;
- synchronized(TransactionManager.this)
- {
+ synchronized (TransactionManager.this) {
commitedAwaitingFlush.remove(txn) ;
}
} catch (InterruptedException ex)
{ Log.fatal(this, "Interruped!", ex) ; }
}
}
-
}
- private void transactionStarts(Transaction txn)
- {
+ private void transactionStarts(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.transactionStarts(txn) ;
}
- private void transactionFinishes(Transaction txn)
- {
+ private void transactionFinishes(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.transactionFinishes(txn) ;
}
- private void transactionCloses(Transaction txn)
- {
+ private void transactionCloses(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.transactionCloses(txn) ;
}
-
- private void readerStarts(Transaction txn)
- {
+
+ private void readerStarts(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.readerStarts(txn) ;
}
-
- private void readerFinishes(Transaction txn)
- {
+
+ private void readerFinishes(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.readerFinishes(txn) ;
}
- private void writerStarts(Transaction txn)
- {
+ private void writerStarts(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.writerStarts(txn) ;
}
- private void writerCommits(Transaction txn)
- {
+ private void writerCommits(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.writerCommits(txn) ;
}
- private void writerAborts(Transaction txn)
- {
+ private void writerAborts(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.writerAborts(txn) ;