You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2016/08/10 20:14:18 UTC

jena git commit: Make code layout more conistent.

Repository: jena
Updated Branches:
  refs/heads/master a71367bc7 -> ce3c8f9aa


Make code layout more conistent.

Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/ce3c8f9a
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/ce3c8f9a
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/ce3c8f9a

Branch: refs/heads/master
Commit: ce3c8f9aaed0f3123767502ce430173258e5bf69
Parents: a71367b
Author: Andy Seaborne <an...@apache.org>
Authored: Wed Aug 10 19:31:58 2016 +0100
Committer: Andy Seaborne <an...@apache.org>
Committed: Wed Aug 10 19:31:58 2016 +0100

----------------------------------------------------------------------
 .../tdb/transaction/TransactionManager.java     | 296 ++++++-------------
 1 file changed, 98 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/ce3c8f9a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
----------------------------------------------------------------------
diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java b/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
index b591b51..db091e7 100644
--- a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
+++ b/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
@@ -67,9 +67,8 @@ public class TransactionManager
     // When improved, rename to chase down any systems directly setting it. 
     public static /*final*/ int QueueBatchSize = setQueueBatchSize() ; 
     
-    private static int setQueueBatchSize() 
-    {
-        if ( SystemTDB.is64bitSystem ) 
+    private static int setQueueBatchSize() {
+        if ( SystemTDB.is64bitSystem )
             return 10 ;
         // On 32bit systems are memory constrained. The Java address space is
         // limited to about 1.5G - the heap can not be bigger.
@@ -81,9 +80,9 @@ public class TransactionManager
     enum TxnPoint { BEGIN, COMMIT, ABORT, CLOSE, QUEUE, UNQUEUE }
     private List<Pair<Transaction, TxnPoint>> transactionStateTransition ;
     
-    private void record(Transaction txn, TxnPoint state)
-    {
-        if ( ! recordHistory ) return ;
+    private void record(Transaction txn, TxnPoint state) {
+        if ( !recordHistory )
+            return ;
         initRecordingState() ;
         transactionStateTransition.add(new Pair<>(txn, state)) ;
     }
@@ -138,8 +137,7 @@ public class TransactionManager
      * 5/ transactionCloses
      */
     
-    private interface TSM
-    {
+    private interface TSM {
         void transactionStarts(Transaction txn) ;
         void transactionFinishes(Transaction txn) ;
         void transactionCloses(Transaction txn) ;
@@ -150,8 +148,7 @@ public class TransactionManager
         void writerAborts(Transaction txn) ;
     }
     
-    class TSM_Base implements TSM
-    {
+    class TSM_Base implements TSM {
         @Override public void transactionStarts(Transaction txn)    {}
         @Override public void transactionFinishes(Transaction txn)  {}
         @Override public void transactionCloses(Transaction txn)    {}
@@ -162,8 +159,7 @@ public class TransactionManager
         @Override public void writerAborts(Transaction txn)         {}
     }
     
-    class TSM_Logger extends TSM_Base
-    {
+    class TSM_Logger extends TSM_Base {
         TSM_Logger() {}
         @Override public void readerStarts(Transaction txn)         { log("start", txn) ; }
         @Override public void readerFinishes(Transaction txn)       { log("finish", txn) ; }
@@ -173,8 +169,7 @@ public class TransactionManager
     }
 
     /** More detailed */
-    class TSM_LoggerDebug extends TSM_Base
-    {
+    class TSM_LoggerDebug extends TSM_Base {
         TSM_LoggerDebug() {}
         @Override public void readerStarts(Transaction txn)         { logInternal("start",  txn) ; }
         @Override public void readerFinishes(Transaction txn)       { logInternal("finish", txn) ; }
@@ -182,11 +177,9 @@ public class TransactionManager
         @Override public void writerCommits(Transaction txn)        { logInternal("commit", txn) ; }
         @Override public void writerAborts(Transaction txn)         { logInternal("abort",  txn) ; }
     }
-
     
     // Mixes stats and state variables :-(
-    class TSM_Counters implements TSM
-    {
+    class TSM_Counters implements TSM {
         TSM_Counters() {}
         @Override public void transactionStarts(Transaction txn)    { activeTransactions.add(txn) ; }
         @Override public void transactionFinishes(Transaction txn)  { activeTransactions.remove(txn) ; }
@@ -209,8 +202,7 @@ public class TransactionManager
     // Policy for writing back journal'ed data to the base datasetgraph
     // Writes if no reader at end of writer, else queues.
     // Queue cleared at en dof any transaction finding itself the only transaction.
-    class TSM_WriteBackEndTxn extends TSM_Base
-    {
+    class TSM_WriteBackEndTxn extends TSM_Base {
         // Safe mode.
         // Take a READ lock over the base dataset.
         // Write-back takes a WRITE lock.
@@ -220,22 +212,22 @@ public class TransactionManager
         
         // Currently, the writer semaphore is managed explicitly in the main code.
         
-        @Override public void readerFinishes(Transaction txn)       
-        { 
+        @Override
+        public void readerFinishes(Transaction txn) {
             txn.getBaseDataset().getLock().leaveCriticalSection() ;
             readerFinishesWorker(txn) ;
         }
 
-        @Override public void writerCommits(Transaction txn)
-        {
+        @Override
+        public void writerCommits(Transaction txn) {
             txn.getBaseDataset().getLock().leaveCriticalSection() ;
             writerCommitsWorker(txn) ;
         }
-        
-        @Override public void writerAborts(Transaction txn)
-        { 
+
+        @Override
+        public void writerAborts(Transaction txn) {
             txn.getBaseDataset().getLock().leaveCriticalSection() ;
-            writerAbortsWorker(txn) ; 
+            writerAbortsWorker(txn) ;
         }
     }
     
@@ -256,8 +248,7 @@ public class TransactionManager
     }
     
 
-    class TSM_Record extends TSM_Base
-    {
+    class TSM_Record extends TSM_Base {
         // Later - record on one list the state transition.
         @Override
         public void transactionStarts(Transaction txn)      { record(txn, BEGIN) ; }
@@ -272,9 +263,8 @@ public class TransactionManager
         (recordHistory ? new TSM_Record() : null ) ,
         new TSM_WriteBackEndTxn()        // Write back policy. Must be last.
     } ;
-    
-    public TransactionManager(DatasetGraphTDB dsg)
-    {
+
+    public TransactionManager(DatasetGraphTDB dsg){
         this.baseDataset = dsg ; 
         this.journal = Journal.create(dsg.getLocation()) ;
         // LATER
@@ -284,55 +274,18 @@ public class TransactionManager
 //        committerThread.start() ;
     }
 
-    public void closedown()
-    {
+    public void closedown() {
         processDelayedReplayQueue(null) ;
         journal.close() ;
     }
 
-    public DatasetGraphTxn begin(ReadWrite mode)
-    {
+    public DatasetGraphTxn begin(ReadWrite mode) {
         return begin(mode, null) ;
     }
     
     public /*for testing only*/ static final boolean DEBUG = false ; 
     
-    /** Control logging - the logger must be set as well */
-    //public /*for testing only*/ static boolean LOG = false ;
-
-//  public Transaction begin(ReadWrite readWrite, boolean canBlock) {
-//  Objects.nonNull(readWrite) ;
-//  checkActive() ;
-//  
-//  if ( canBlock )
-//      exclusivitylock.readLock().lock() ;
-//  else {
-//      if ( ! exclusivitylock.readLock().tryLock() )
-//          return null ;
-//  }
-//  
-//  // Readers never block.
-//  if ( readWrite == WRITE ) {
-//      // Writers take a WRITE permit from the semaphore to ensure there
-//      // is at most one active writer, else the attempt to start the
-//      // transaction blocks.
-//      // Released by in notifyCommitFinish/notifyAbortFinish
-//      boolean b = acquireWriterLock(canBlock) ;
-//      if ( !b ) {
-//          exclusivitylock.readLock().unlock() ;
-//          return null ;
-//      }
-//  }
-//  Transaction transaction = begin$(readWrite) ;
-//  startActiveTransaction(transaction) ;
-//  transaction.begin();
-//  return transaction;
-//}
-
-
-    
-    public DatasetGraphTxn begin(ReadWrite mode, String label)
-    {
+    public DatasetGraphTxn begin(ReadWrite mode, String label) {
         exclusivitylock.readLock().lock() ;
         
         // Not synchronized (else blocking on semaphore will never wake up
@@ -353,8 +306,7 @@ public class TransactionManager
     // of the low level object directly so we'll play safe.  
     
     synchronized
-    private DatasetGraphTxn begin$(ReadWrite mode, String label)
-    {
+    private DatasetGraphTxn begin$(ReadWrite mode, String label) {
         if ( mode == ReadWrite.WRITE && activeWriters.get() > 0 )    // Guard
             throw new TDBTransactionException("Existing active write transaction") ;
 
@@ -367,14 +319,13 @@ public class TransactionManager
         
         DatasetGraphTDB dsg = baseDataset ;
         // *** But, if there are pending, committed transactions, use latest.
-        if ( ! commitedAwaitingFlush.isEmpty() )
-        {  
-            if ( DEBUG ) System.out.print(commitedAwaitingFlush.size()) ;
-            dsg = commitedAwaitingFlush.get(commitedAwaitingFlush.size()-1).getActiveDataset().getView() ;
-        }
-        else 
-        {
-            if ( DEBUG ) System.out.print('_') ;
+        if ( !commitedAwaitingFlush.isEmpty() ) {
+            if ( DEBUG )
+                System.out.print(commitedAwaitingFlush.size()) ;
+            dsg = commitedAwaitingFlush.get(commitedAwaitingFlush.size() - 1).getActiveDataset().getView() ;
+        } else {
+            if ( DEBUG )
+                System.out.print('_') ;
         }
         Transaction txn = createTransaction(dsg, mode, label) ;
         
@@ -387,9 +338,8 @@ public class TransactionManager
         // Empty for READ ; only WRITE transactions have components that need notifiying.
         List<TransactionLifecycle> components = dsgTxn.getTransaction().lifecycleComponents() ;
         
-        if ( mode == ReadWrite.READ )
-        {
-            // ---- Consistency check.  View caching does not reset components.
+        if ( mode == ReadWrite.READ ) {
+            // ---- Consistency check. View caching does not reset components.
             if ( components.size() != 0 )
                 log.warn("read transaction, non-empty lifecycleComponents list") ;
         }
@@ -401,24 +351,20 @@ public class TransactionManager
         return dsgTxn ;
     }
     
-    private Transaction createTransaction(DatasetGraphTDB dsg, ReadWrite mode, String label)
-    {
+    private Transaction createTransaction(DatasetGraphTDB dsg, ReadWrite mode, String label) {
         Transaction txn = new Transaction(dsg, mode, transactionId.getAndIncrement(), label, this) ;
         return txn ;
     }
 
-    private DatasetGraphTxn createDSGTxn(DatasetGraphTDB dsg, Transaction txn, ReadWrite mode)
-    {
+    private DatasetGraphTxn createDSGTxn(DatasetGraphTDB dsg, Transaction txn, ReadWrite mode) {
         // A read transaction (if it has no lifecycle components) can be shared over all
         // read transactions at the same commit level. 
         //    lastreader
         
-        if ( mode == ReadWrite.READ )
-        {   
+        if ( mode == ReadWrite.READ ) {
             // If a READ transaction, and a previously built one is cached, use it.
-            DatasetGraphTDB dsgCached = currentReaderView.get();
-            if ( dsgCached != null )
-            {
+            DatasetGraphTDB dsgCached = currentReaderView.get() ;
+            if ( dsgCached != null ) {
                 // No components so we don't need to notify them.
                 // We can just reuse the storage dataset.
                 return new DatasetGraphTxn(dsgCached, txn) ;
@@ -426,11 +372,10 @@ public class TransactionManager
         }
         
         DatasetGraphTxn dsgTxn = new DatasetBuilderTxn(this).build(txn, mode, dsg) ;
-        if ( mode == ReadWrite.READ )
-        {
+        if ( mode == ReadWrite.READ ) {
             // If a READ transaction, cache the storage view.
             // This is cleared when a WRITE commits
-            currentReaderView.set(dsgTxn.getView());
+            currentReaderView.set(dsgTxn.getView()) ;
         }
         return dsgTxn ;
     }
@@ -441,15 +386,13 @@ public class TransactionManager
      *  together with general recording of transaction details and status. 
      */ 
     synchronized
-    public void notifyCommit(Transaction transaction)
-    {
+    public void notifyCommit(Transaction transaction) {
         if ( ! activeTransactions.contains(transaction) )
             SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ;
         
         noteTxnCommit(transaction) ;
 
-        switch ( transaction.getMode() )
-        {
+        switch ( transaction.getMode() ) {
             case READ: break ;
             case WRITE:
                 currentReaderView.set(null) ;       // Clear the READ transaction cache.
@@ -458,8 +401,7 @@ public class TransactionManager
     }
 
     synchronized
-    public void notifyAbort(Transaction transaction)
-    {
+    public void notifyAbort(Transaction transaction) {
         // Transaction has done the abort on all the transactional elements.
         if ( ! activeTransactions.contains(transaction) )
             SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ;
@@ -588,8 +530,7 @@ public class TransactionManager
 
     /** Try to flush the delayed write queue - only happens if there are no active transactions */ 
     synchronized
-    public void flush()
-    {
+    public void flush() {
         processDelayedReplayQueue(null) ;
     }
     
@@ -597,43 +538,37 @@ public class TransactionManager
     // Called from TSM_WriteBackEndTxn but the worker code is shere so all
     // related code, including queue flushing is close together.
     
-    private void readerFinishesWorker(Transaction txn)
-    {
+    private void readerFinishesWorker(Transaction txn) {
         if ( queue.size() >= QueueBatchSize )
             processDelayedReplayQueue(txn) ;
     }
-    
-    private void writerAbortsWorker(Transaction txn)
-    {
+
+    private void writerAbortsWorker(Transaction txn) {
         if ( queue.size() >= QueueBatchSize )
             processDelayedReplayQueue(txn) ;
     }
     
-    private void writerCommitsWorker(Transaction txn)
-    {
-        if ( activeReaders.get() == 0 && queue.size() >= QueueBatchSize )
-        {
+    private void writerCommitsWorker(Transaction txn) {
+        if ( activeReaders.get() == 0 && queue.size() >= QueueBatchSize ) {
             // Can commit immediately.
             // Ensure the queue is empty though.
-            // Could simply add txn to the commit queue and do it that way.  
-            if ( log() ) log("Commit immediately", txn) ; 
-            
-            // Currently, all we need is 
+            // Could simply add txn to the commit queue and do it that way.
+            if ( log() ) log("Commit immediately", txn) ;
+
+            // Currently, all we need is
             //    JournalControl.replay(txn) ;
             // because that plays queued transactions.
             // But for long term generallity, at the cost of one check of the journal size
             // we do this sequence.
-            
+
             processDelayedReplayQueue(txn) ;
             enactTransaction(txn) ;
             JournalControl.replay(txn) ;
-        }
-        else
-        {
+        } else {
             // Can't write back to the base database at the moment.
             commitedAwaitingFlush.add(txn) ;
             maxQueue = Math.max(commitedAwaitingFlush.size(), maxQueue) ;
-            if ( log() ) log("Add to pending queue", txn) ; 
+            if ( log() ) log("Add to pending queue", txn) ;
             queue.add(txn) ;
         }
 
@@ -642,15 +577,14 @@ public class TransactionManager
     private void processDelayedReplayQueue(Transaction txn)
     {
         // Can we do work?
-        if ( activeReaders.get() != 0 || activeWriters.get() != 0 )
-        {
+        
+        if ( activeReaders.get() != 0 || activeWriters.get() != 0 ) {
             if ( queue.size() > 0 && log() )
                 log(format("Pending transactions: R=%s / W=%s", activeReaders, activeWriters), txn) ;
             return ;
         }
 
-        if ( DEBUG )
-        {
+        if ( DEBUG ) {
             if ( queue.size() > 0 ) 
                 System.out.print("!"+queue.size()+"!") ;
         }
@@ -668,12 +602,11 @@ public class TransactionManager
         // against the updated database.
         currentReaderView.set(null) ;
         
-        while ( queue.size() > 0 )
-        {
+        while (queue.size() > 0) {
             // Currently, replay is replay everything
             // so looping on a per-transaction basis is
-            // pointless but harmless.  
-            
+            // pointless but harmless.
+
             try {
                 Transaction txn2 = queue.take() ;
                 if ( txn2.getMode() == ReadWrite.READ )
@@ -699,32 +632,25 @@ public class TransactionManager
         checkReplaySafe() ;
         if ( log() )
             log("End flush delayed commits", txn) ;
-        
-        
-        
     }
 
-    private void checkNodesDatJrnl(String label, Transaction txn)
-    {
-        if (txn != null)
-        {
-            String x = txn.getBaseDataset().getLocation().getPath(label+": nodes.dat-jrnl") ;
+    private void checkNodesDatJrnl(String label, Transaction txn) {
+        if ( txn != null ) {
+            String x = txn.getBaseDataset().getLocation().getPath(label + ": nodes.dat-jrnl") ;
             long len = new File(x).length() ;
-            if (len != 0)
+            if ( len != 0 )
                 log("nodes.dat-jrnl: not empty", txn) ;
-        }   
+        }
     }
     
-    private void checkReplaySafe()
-    {
+    private void checkReplaySafe() {
         if ( ! checking ) return ;
         if ( activeReaders.get() != 0 || activeWriters.get() != 0 )
             log.error("There are now active transactions") ;
     }
     
     synchronized
-    public void notifyClose(Transaction txn)
-    {
+    public void notifyClose(Transaction txn) {
         if ( txn.getState() == TxnState.ACTIVE )
         {
             String x = txn.getBaseDataset().getLocation().getDirectoryPath() ;
@@ -737,8 +663,7 @@ public class TransactionManager
         noteTxnClose(txn) ;
     }
         
-    private void noteStartTxn(Transaction transaction)
-    {
+    private void noteStartTxn(Transaction transaction) {
         switch (transaction.getMode())
         {
             case READ : readerStarts(transaction) ; break ;
@@ -747,8 +672,7 @@ public class TransactionManager
         transactionStarts(transaction) ;
     }
 
-    private void noteTxnCommit(Transaction transaction)
-    {
+    private void noteTxnCommit(Transaction transaction) {
         switch (transaction.getMode())
         {
             case READ : readerFinishes(transaction) ; break ;
@@ -758,8 +682,7 @@ public class TransactionManager
         exclusivitylock.readLock().unlock() ;
     }
     
-    private void noteTxnAbort(Transaction transaction)
-    {
+    private void noteTxnAbort(Transaction transaction) {
         switch (transaction.getMode())
         {
             case READ : readerFinishes(transaction) ; break ;
@@ -769,8 +692,7 @@ public class TransactionManager
         exclusivitylock.readLock().unlock() ;
     }
     
-    private void noteTxnClose(Transaction transaction)
-    {
+    private void noteTxnClose(Transaction transaction) {
         transactionCloses(transaction) ;
     }
     
@@ -779,27 +701,23 @@ public class TransactionManager
     /** Get recording state */
     public boolean recording()              { return recordHistory ; }
     /** Set recording on or off */
-    public void recording(boolean flag)
-    {
+    public void recording(boolean flag) {
         recordHistory = flag ;
         if ( recordHistory )
             initRecordingState() ;
     }
     /** Clear all recording state - does not clear stats */ 
-    public void clearRecordingState()
-    {
+    public void clearRecordingState() {
         initRecordingState() ;
         transactionStateTransition.clear() ;
     }
     
-    private void initRecordingState()
-    {
+    private void initRecordingState() {
         if ( transactionStateTransition == null )
             transactionStateTransition = new ArrayList<>() ;
     }
 
-    public Journal getJournal()
-    {
+    public Journal getJournal() {
         return journal ;
     }
 
@@ -808,13 +726,11 @@ public class TransactionManager
     
     private final boolean logstate = (syslog.isDebugEnabled() || log.isDebugEnabled()) ;
     
-    private boolean log()
-    {
+    private boolean log() {
         return logstate ;
     }
     
-    private void log(String msg, Transaction txn)
-    {
+    private void log(String msg, Transaction txn) {
         if ( ! log() )
             return ;
         if ( txn == null )
@@ -823,8 +739,7 @@ public class TransactionManager
             logger().debug(txn.getLabel()+": "+msg) ;
     }
     
-    private void logInternal(String action, Transaction txn)
-    {
+    private void logInternal(String action, Transaction txn) {
         if ( ! log() )
             return ;
         String txnStr = ( txn == null ) ? "<null>" : txn.getLabel() ;
@@ -832,8 +747,7 @@ public class TransactionManager
         logger().debug(format("%6s %s -- %s", action, txnStr, state())) ;
     }
 
-    private static Logger logger()
-    {
+    private static Logger logger() {
         if ( syslog.isDebugEnabled() )
             return syslog ;
         else
@@ -841,19 +755,15 @@ public class TransactionManager
     }
     
     synchronized
-    public SysTxnState state()
-    { 
+    public SysTxnState state() {
         return new SysTxnState(this) ;
     }
     
     // LATER.
-    class Committer implements Runnable
-    {
+    class Committer implements Runnable {
         @Override
-        public void run()
-        {
-            for(;;)
-            {
+        public void run() {
+            for ( ;; ) {
                 // Wait until the reader count goes to zero.
                 
                 // This wakes up for every transation but maybe 
@@ -862,68 +772,58 @@ public class TransactionManager
                     Transaction txn = queue.take() ;
                     // This takes a Write lock on the  DSG - this is where it blocks.
                     JournalControl.replay(txn) ;
-                    synchronized(TransactionManager.this)
-                    {
+                    synchronized (TransactionManager.this) {
                         commitedAwaitingFlush.remove(txn) ;
                     }
                 } catch (InterruptedException ex)
                 { Log.fatal(this, "Interruped!", ex) ; }
             }
         }
-        
     }
     
-    private void transactionStarts(Transaction txn)
-    {
+    private void transactionStarts(Transaction txn) {
         for ( TSM tsm : actions )
             if ( tsm != null )
                 tsm.transactionStarts(txn) ;
     }
 
-    private void transactionFinishes(Transaction txn)
-    {
+    private void transactionFinishes(Transaction txn) {
         for ( TSM tsm : actions )
             if ( tsm != null )
                 tsm.transactionFinishes(txn) ;
     }
     
-    private void transactionCloses(Transaction txn)
-    {
+    private void transactionCloses(Transaction txn) {
         for ( TSM tsm : actions )
             if ( tsm != null )
                 tsm.transactionCloses(txn) ;
     }
-    
-    private void readerStarts(Transaction txn)
-    {
+
+    private void readerStarts(Transaction txn) {
         for ( TSM tsm : actions )
             if ( tsm != null )
                 tsm.readerStarts(txn) ;
     }
-    
-    private void readerFinishes(Transaction txn)
-    {
+
+    private void readerFinishes(Transaction txn) {
         for ( TSM tsm : actions )
             if ( tsm != null )
                 tsm.readerFinishes(txn) ;
     }
 
-    private void writerStarts(Transaction txn)
-    {
+    private void writerStarts(Transaction txn) {
         for ( TSM tsm : actions )
             if ( tsm != null )
                 tsm.writerStarts(txn) ;
     }
 
-    private void writerCommits(Transaction txn)
-    {
+    private void writerCommits(Transaction txn) {
         for ( TSM tsm : actions )
             if ( tsm != null )
                 tsm.writerCommits(txn) ;
     }
 
-    private void writerAborts(Transaction txn)
-    {
+    private void writerAborts(Transaction txn) {
         for ( TSM tsm : actions )
             if ( tsm != null )
                 tsm.writerAborts(txn) ;