You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2017/10/03 19:34:10 UTC
[24/65] [abbrv] jena git commit: JENA-1397: Rename java packages
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinator.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinator.java
deleted file mode 100644
index 8a866a9..0000000
--- a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinator.java
+++ /dev/null
@@ -1,805 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.transaction.txn;
-
-import static org.apache.jena.query.ReadWrite.WRITE ;
-import static org.seaborne.dboe.transaction.txn.journal.JournalEntryType.UNDO ;
-
-import java.nio.ByteBuffer ;
-import java.util.ArrayList ;
-import java.util.Iterator ;
-import java.util.List ;
-import java.util.Objects ;
-import java.util.concurrent.ConcurrentHashMap ;
-import java.util.concurrent.Semaphore ;
-import java.util.concurrent.atomic.AtomicLong ;
-import java.util.concurrent.locks.ReadWriteLock ;
-import java.util.concurrent.locks.ReentrantReadWriteLock ;
-
-import org.apache.jena.atlas.logging.Log ;
-import org.apache.jena.query.ReadWrite ;
-import org.seaborne.dboe.base.file.Location ;
-import org.seaborne.dboe.sys.Sys ;
-import org.seaborne.dboe.transaction.txn.journal.Journal ;
-import org.seaborne.dboe.transaction.txn.journal.JournalEntry ;
-import org.slf4j.Logger ;
-
-/**
- * One {@code TransactionCoordinator} per group of {@link TransactionalComponent}s.
- * {@link TransactionalComponent}s can not be shared across TransactionCoordinators.
- * <p>
- * This is a general engine although tested and most used for multiple-reader
- * and single-writer (MR+SW). {@link TransactionalComponentLifecycle} provides the
- * per-threadstyle.
- * <p>
- * Contrast to MRSW: multiple-reader or single-writer.
- * <h3>Block writers</h3>
- * Block until no writers are active.
- * When this returns, this guarantees that the database is not changing
- * and the journal is flushed to disk.
- * <p>
- * See {@link #blockWriters()}, {@link #enableWriters()}, {@link #execAsWriter(Runnable)}
- * <h3>Excluisve mode</h3>
- * Exclusive mode is when the current thread is the only active code : no readers, no writers.
- * <p>
- * See {@link #startExclusiveMode()}/{@link #tryExclusiveMode()} {@link #finishExclusiveMode()}, {@link #execExclusive(Runnable)}
- *
- * @see Transaction
- * @see TransactionalComponent
- * @see TransactionalSystem
- */
-final
-public class TransactionCoordinator {
- private static Logger log = Sys.syslog ;
-
- private final Journal journal ;
- private boolean coordinatorStarted = false ;
-
- private final ComponentGroup components = new ComponentGroup() ;
- // Components
- private ComponentGroup txnComponents = null ;
- private List<ShutdownHook> shutdownHooks ;
- private TxnIdGenerator txnIdGenerator = TxnIdFactory.txnIdGenSimple ;
-
- private QuorumGenerator quorumGenerator = null ;
- //private QuorumGenerator quorumGenerator = (m) -> components ;
-
- // Semaphore to implement "Single Active Writer" - independent of readers
- // This is not reentrant.
- private Semaphore writersWaiting = new Semaphore(1, true) ;
-
- // All transaction need a "read" lock through out their lifetime.
- // Do not confuse with read/write transactions. We need a
- // "one exclusive, or many other" lock which happens to be called ReadWriteLock
- // See also {@code lock} which protects the datastructures during transaction management.
- private ReadWriteLock exclusivitylock = new ReentrantReadWriteLock() ;
-
- // Coordinator wide lock object.
- private Object coordinatorLock = new Object() ;
-
- @FunctionalInterface
- public interface ShutdownHook { void shutdown() ; }
-
- /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */
- public TransactionCoordinator(Location location) {
- this(Journal.create(location)) ;
- }
-
- /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */
- public TransactionCoordinator(Journal journal) {
- this(journal, null , new ArrayList<>()) ;
- }
-
- /** Create a TransactionCoordinator, initially with {@link TransactionalComponent} in the ComponentGroup */
- public TransactionCoordinator(Journal journal, List<TransactionalComponent> components) {
- this(journal, components , new ArrayList<>()) ;
- }
-
- // /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */
-// public TransactionCoordinator(Location journalLocation) {
-// this(Journal.create(journalLocation), new ArrayList<>() , new ArrayList<>()) ;
-// }
-
- private TransactionCoordinator(Journal journal, List<TransactionalComponent> txnComp, List<ShutdownHook> shutdownHooks) {
- this.journal = journal ;
- this.shutdownHooks = new ArrayList<>(shutdownHooks) ;
- if ( txnComp != null ) {
- //txnComp.forEach(x-> System.out.println(x.getComponentId().label()+" :: "+Bytes.asHex(x.getComponentId().bytes()) ) ) ;
- txnComp.forEach(components::add);
- }
- }
-
- /** Add a {@link TransactionalComponent}.
- * Safe to call at any time but it is good practice is to add all the
- * compoents before any transactions start.
- * Internally, the coordinator ensures the add will safely happen but it
- * does not add the component to existing transactions.
- * This must be setup before recovery is attempted.
- */
- public TransactionCoordinator add(TransactionalComponent elt) {
- checkSetup() ;
- synchronized(coordinatorLock) {
- components.add(elt) ;
- }
- return this ;
- }
-
- /**
- * Remove a {@link TransactionalComponent}.
- * @see #add
- */
- public TransactionCoordinator remove(TransactionalComponent elt) {
- checkSetup() ;
- synchronized(coordinatorLock) {
- components.remove(elt.getComponentId()) ;
- }
- return this ;
- }
-
- /**
- * Add a shutdown hook. Shutdown is not guaranteed to be called
- * and hence hooks may not get called.
- */
- public void add(TransactionCoordinator.ShutdownHook hook) {
- checkSetup() ;
- synchronized(coordinatorLock) {
- shutdownHooks.add(hook) ;
- }
- }
-
- /** Remove a shutdown hook */
- public void remove(TransactionCoordinator.ShutdownHook hook) {
- checkSetup() ;
- synchronized(coordinatorLock) {
- shutdownHooks.remove(hook) ;
- }
- }
-
- public void setQuorumGenerator(QuorumGenerator qGen) {
- checkSetup() ;
- this.quorumGenerator = qGen ;
- }
-
- public void start() {
- checkSetup() ;
- recovery() ;
- coordinatorStarted = true ;
- }
-
- private /*public*/ void recovery() {
-
- Iterator<JournalEntry> iter = journal.entries() ;
- if ( ! iter.hasNext() ) {
- components.forEachComponent(c -> c.cleanStart()) ;
- return ;
- }
-
- log.info("Journal recovery start") ;
- components.forEachComponent(c -> c.startRecovery()) ;
-
- // Group to commit
-
- List<JournalEntry> entries = new ArrayList<>() ;
-
- iter.forEachRemaining( entry -> {
- switch(entry.getType()) {
- case ABORT :
- entries.clear() ;
- break ;
- case COMMIT :
- recover(entries) ;
- entries.clear() ;
- break ;
- case REDO : case UNDO :
- entries.add(entry) ;
- break ;
- }
- }) ;
-
- components.forEachComponent(c -> c.finishRecovery()) ;
- journal.reset() ;
- log.info("Journal recovery end") ;
- }
-
- private void recover(List<JournalEntry> entries) {
- entries.forEach(e -> {
- if ( e.getType() == UNDO ) {
- Log.warn(TransactionCoordinator.this, "UNDO entry : not handled") ;
- return ;
- }
- ComponentId cid = e.getComponentId() ;
- ByteBuffer bb = e.getByteBuffer() ;
- // find component.
- TransactionalComponent c = components.findComponent(cid) ;
- if ( c == null ) {
- Log.warn(TransactionCoordinator.this, "No component for "+cid) ;
- return ;
- }
- c.recover(bb);
- }) ;
- }
-
- public void setTxnIdGenerator(TxnIdGenerator generator) {
- this.txnIdGenerator = generator ;
- }
-
- public Journal getJournal() {
- return journal ;
- }
-
- public TransactionCoordinatorState detach(Transaction txn) {
- txn.detach();
- TransactionCoordinatorState coordinatorState = new TransactionCoordinatorState(txn) ;
- components.forEach((id, c) -> {
- SysTransState s = c.detach() ;
- coordinatorState.componentStates.put(id, s) ;
- } ) ;
- // The txn still counts as "active" for tracking purposes below.
- return coordinatorState ;
- }
-
- public void attach(TransactionCoordinatorState coordinatorState) {
- Transaction txn = coordinatorState.transaction ;
- txn.attach() ;
- coordinatorState.componentStates.forEach((id, obj) -> {
- components.findComponent(id).attach(obj);
- });
- }
-
- public void shutdown() {
- if ( coordinatorLock == null )
- return ;
- components.forEach((id, c) -> c.shutdown()) ;
- shutdownHooks.forEach((h)-> h.shutdown()) ;
- coordinatorLock = null ;
- journal.close();
- }
-
- // Are we in the initialization phase?
- private void checkSetup() {
- if ( coordinatorStarted )
- throw new TransactionException("TransactionCoordinator has already been started") ;
- }
-
- // Are we up and ruuning?
- private void checkActive() {
- if ( ! coordinatorStarted )
- throw new TransactionException("TransactionCoordinator has not been started") ;
- checkNotShutdown();
- }
-
- // Check not wrapped up
- private void checkNotShutdown() {
- if ( coordinatorLock == null )
- throw new TransactionException("TransactionCoordinator has been shutdown") ;
- }
-
- private void releaseWriterLock() {
- int x = writersWaiting.availablePermits() ;
- if ( x != 0 )
- throw new TransactionException("TransactionCoordinator: Probably mismatch of enable/disableWriter calls") ;
- writersWaiting.release() ;
- }
-
- /** Acquire the writer lock - return true if succeeded */
- private boolean acquireWriterLock(boolean canBlock) {
- if ( ! canBlock )
- return writersWaiting.tryAcquire() ;
- try {
- writersWaiting.acquire() ;
- return true;
- } catch (InterruptedException e) { throw new TransactionException(e) ; }
- }
-
- /** Enter exclusive mode; block if necessary.
- * There are no active transactions on return; new transactions will be held up in 'begin'.
- * Return to normal (release waiting transactions, allow new transactions)
- * with {@link #finishExclusiveMode}.
- * <p>
- * Do not call inside an existing transaction.
- */
- public void startExclusiveMode() {
- startExclusiveMode(true);
- }
-
- /** Try to enter exclusive mode.
- * If return is true, then there are no active transactions on return and new transactions will be held up in 'begin'.
- * If false, there were in-progress transactions.
- * Return to normal (release waiting transactions, allow new transactions)
- * with {@link #finishExclusiveMode}.
- * <p>
- * Do not call inside an existing transaction.
- */
- public boolean tryExclusiveMode() {
- return tryExclusiveMode(false);
- }
-
- /** Try to enter exclusive mode.
- * If return is true, then there are no active transactions on return and new transactions will be held up in 'begin'.
- * If false, there were in-progress transactions.
- * Return to normal (release waiting transactions, allow new transactions)
- * with {@link #finishExclusiveMode}.
- * <p>
- * Do not call inside an existing transaction.
- * @param canBlock Allow the operation block and wait for the exclusive mode lock.
- */
- public boolean tryExclusiveMode(boolean canBlock) {
- return startExclusiveMode(canBlock);
- }
-
- private boolean startExclusiveMode(boolean canBlock) {
- if ( canBlock ) {
- exclusivitylock.writeLock().lock() ;
- return true ;
- }
- return exclusivitylock.writeLock().tryLock() ;
- }
-
- /** Return to normal (release waiting transactions, allow new transactions).
- * Must be paired with an earlier {@link #startExclusiveMode}.
- */
- public void finishExclusiveMode() {
- exclusivitylock.writeLock().unlock() ;
- }
-
- /** Execute an action in exclusive mode. This method can block.
- * Equivalent to:
- * <pre>
- * startExclusiveMode() ;
- * try { action.run(); }
- * finally { finishExclusiveMode(); }
- * </pre>
- *
- * @param action
- */
- public void execExclusive(Runnable action) {
- startExclusiveMode() ;
- try { action.run(); }
- finally { finishExclusiveMode(); }
- }
-
- /** Block until no writers are active.
- * When this returns, this guarantees that the database is not changing
- * and the journal is flushed to disk.
- * <p>
- * The application must call {@link #enableWriters} later.
- * <p>
- * This operation must not be nested (it will block).
- *
- * @see #tryBlockWriters()
- * @see #enableWriters()
- *
- */
- public void blockWriters() {
- acquireWriterLock(true) ;
- }
-
- /** Try to block all writers, or return if can't at the moment.
- * <p>
- * Unlike a write transction, there is no associated transaction.
- * <p>
- * If it returns true, the application must call {@link #enableWriters} later.
- *
- * @see #blockWriters()
- * @see #enableWriters()
-
- * @return true if the operation succeeded and writers are blocked
- */
- public boolean tryBlockWriters() {
- return tryBlockWriters(false) ;
- }
-
- /**
- * Block until no writers are active, optionally blocking or returning if can't at the moment.
- * <p>
- * Unlike a write transction, there is no associated transaction.
- * <p>
- * If it returns true, the application must call {@link #enableWriters} later.
- * @param canBlock
- * @return true if the operation succeeded and writers are blocked
- */
- public boolean tryBlockWriters(boolean canBlock) {
- return acquireWriterLock(canBlock) ;
- }
- /** Allow writers.
- * This must be used in conjunction with {@link #blockWriters()} or {@link #tryBlockWriters()}
- *
- * @see #blockWriters()
- * @see #tryBlockWriters()
- */
- public void enableWriters() {
- releaseWriterLock();
- }
-
- /** Execute an action in as if a Write but no write transaction started.
- * This method can block.
- * <p>
- * Equivalent to:
- * <pre>
- * blockWriters() ;
- * try { action.run(); }
- * finally { enableWriters(); }
- * </pre>
- *
- * @param action
- */
- public void execAsWriter(Runnable action) {
- blockWriters() ;
- try { action.run(); }
- finally { enableWriters(); }
- }
-
- /** Start a transaction. This may block. */
- public Transaction begin(ReadWrite readWrite) {
- return begin(readWrite, true) ;
- }
-
- /**
- * Start a transaction. Returns null if this operation would block.
- * Readers can start at any time.
- * A single writer policy is currently imposed so a "begin(WRITE)"
- * may block.
- */
- public Transaction begin(ReadWrite readWrite, boolean canBlock) {
- Objects.nonNull(readWrite) ;
- checkActive() ;
-
- // XXX Flag to bounce writers fpor long term "block writers"
- if ( false /* bounceWritersAtTheMoment */) {
- if ( readWrite == WRITE ) {
- throw new TransactionException("Writers currently being rejected");
- }
- }
-
- if ( canBlock )
- exclusivitylock.readLock().lock() ;
- else {
- if ( ! exclusivitylock.readLock().tryLock() )
- return null ;
- }
-
- // Readers never block.
- if ( readWrite == WRITE ) {
- // Writers take a WRITE permit from the semaphore to ensure there
- // is at most one active writer, else the attempt to start the
- // transaction blocks.
- // Released by in notifyCommitFinish/notifyAbortFinish
- boolean b = acquireWriterLock(canBlock) ;
- if ( !b ) {
- exclusivitylock.readLock().unlock() ;
- return null ;
- }
- }
- Transaction transaction = begin$(readWrite) ;
- startActiveTransaction(transaction) ;
- transaction.begin();
- return transaction;
- }
-
- // The version is the serialization point for a transaction.
- // All transactions on the same view of the data get the same serialization point.
-
- // A read transaction can be promoted if writer does not start
- // This TransactionCoordinator provides Serializable, Read-lock-free
- // execution. With no item locking, a read can only be promoted
- // if no writer started since the reader started.
-
- /* The version of the data - incremented when transaction commits.
- * This is the version with repest to the last commited transaction.
- * Aborts do not cause the data version to advance.
- * This counterr never goes backwards.
- */
- private final AtomicLong dataVersion = new AtomicLong(0) ;
-
- private Transaction begin$(ReadWrite readWrite) {
- synchronized(coordinatorLock) {
- // Thread safe part of 'begin'
- // Allocate the transaction serialization point.
- TxnId txnId = txnIdGenerator.generate() ;
- List<SysTrans> sysTransList = new ArrayList<>() ;
- Transaction transaction = new Transaction(this, txnId, readWrite, dataVersion.get(), sysTransList) ;
-
- ComponentGroup txnComponents = chooseComponents(this.components, readWrite) ;
-
- try {
- txnComponents.forEachComponent(elt -> {
- SysTrans sysTrans = new SysTrans(elt, transaction, txnId) ;
- sysTransList.add(sysTrans) ; }) ;
- // Calling each component must be inside the lock
- // so that a transaction does not commit overlapping with setup.
- // If it did, different components might end up starting from
- // different start states of the overall system.
- txnComponents.forEachComponent(elt -> elt.begin(transaction)) ;
- } catch(Throwable ex) {
- // Careful about incomplete.
- //abort() ;
- //complete() ;
- throw ex ;
- }
- return transaction ;
- }
- }
-
- private ComponentGroup chooseComponents(ComponentGroup components, ReadWrite readWrite) {
- if ( quorumGenerator == null )
- return components ;
- ComponentGroup cg = quorumGenerator.genQuorum(readWrite) ;
- if ( cg == null )
- return components ;
- cg.forEach((id, c) -> {
- TransactionalComponent tcx = components.findComponent(id) ;
- if ( ! tcx.equals(c) )
- log.warn("TransactionalComponent not in TransactionCoordinator's ComponentGroup") ;
- }) ;
- if ( log.isDebugEnabled() )
- log.debug("Custom ComponentGroup for transaction "+readWrite+": size="+cg.size()+" of "+components.size()) ;
- return cg ;
- }
-
- /** Is promotion of transactions enabled? */
- /*private*/public/*for development*/ static boolean promotion = true ;
-
- /** Control of whether a transaction promotion can see any commits that
- * happened between this transaction starting and it promoting.
- * A form of "ReadCommitted".
- */
- /*private*/public/*for development*/ static boolean readCommittedPromotion = false ;
-
- /** Whether to wait for writers when trying to promote */
- private static final boolean promotionWaitForWriters = true;
-
- /** Attempt to promote a tranasaction from READ to WRITE.
- * No-op for a transaction already a writer.
- * Throws {@link TransactionException} if the promotion
- * can not be done.
- */
- /*package*/ boolean promoteTxn(Transaction transaction) {
- if ( ! promotion )
- return false;
-
- if ( transaction.getMode() == WRITE )
- return true ;
-
- // Has there been an writer active since the transaction started?
- // Do a test outside the lock - only dataVaersion can change and that increases.
- // If "read commited transactions" not allowed, the data has changed in a way we
- // do no twish to expose.
- // If this test fails outside the lock it will fail inside.
- // If it passes, we have to test again in case there is an active writer.
-
- if ( ! readCommittedPromotion ) {
- long txnEpoch = transaction.getDataVersion() ; // The transaction-start point.
- long currentEpoch = dataVersion.get() ; // The data serialization point.
-
- if ( txnEpoch < currentEpoch )
- // The data has changed and "read committed" not allowed.
- // We can reject now.
- return false ;
- }
-
- // Once we have acquireWriterLock, we are single writer.
- // We may have to discard writer status because eocne we can make the defintite
- // decision on promotion, we find we can't promote after all.
- if ( readCommittedPromotion ) {
- /*
- * acquireWriterLock(true) ;
- * synchronized(coordinatorLock) {
- * begin$ ==>
- * reset transaction.
- * promote components
- * reset dataVersion
- */
- acquireWriterLock(true) ;
- synchronized(coordinatorLock) {
- try {
- transaction.promoteComponents() ;
- // Because we want to see the new state of the data.s
- //transaction.resetDataVersion(dataVersion.get());
- } catch (TransactionException ex) {
- try { transaction.abort(); } catch(RuntimeException ex2) {}
- releaseWriterLock();
- return false ;
- }
- }
- return true;
- }
-
- if ( ! waitForWriters() )
- // Failed to become a writer.
- return false;
- // Now a proto-writer.
-
- synchronized(coordinatorLock) {
- // Not read commited.
- // Need to check the data version once we are the writer and all previous
- // writers have commited or aborted.
- // Has there been an writer active since the transaction started?
- long txnEpoch = transaction.getDataVersion() ; // The transaction-start point.
- long currentEpoch = dataVersion.get() ; // The data serialization point.
-
- if ( txnEpoch != currentEpoch ) {
- // Failed to promote.
- releaseWriterLock();
- return false ;
- }
-
- // ... we have now got the writer lock ...
- try {
- transaction.promoteComponents() ;
- // No need to reset the data version because strict isolation.
- } catch (TransactionException ex) {
- try { transaction.abort(); } catch(RuntimeException ex2) {}
- releaseWriterLock();
- return false ;
- }
- }
- return true ;
- }
-
- private boolean waitForWriters() {
- if ( promotionWaitForWriters )
- return acquireWriterLock(true) ;
- else
- return acquireWriterLock(false) ;
- }
-
- // Called once by Transaction after the action of commit()/abort() or end()
- /** Signal that the transaction has finished. */
- /*package*/ void completed(Transaction transaction) {
- finishActiveTransaction(transaction);
- journal.reset() ;
- }
-
- /*package*/ void executePrepare(Transaction transaction) {
- // Do here because it needs access to the journal.
- notifyPrepareStart(transaction);
- transaction.getComponents().forEach(sysTrans -> {
- TransactionalComponent c = sysTrans.getComponent() ;
- ByteBuffer data = c.commitPrepare(transaction) ;
- if ( data != null ) {
- PrepareState s = new PrepareState(c.getComponentId(), data) ;
- journal.write(s) ;
- }
- }) ;
- notifyPrepareFinish(transaction);
- }
-
- /*package*/ void executeCommit(Transaction transaction, Runnable commit, Runnable finish) {
- // This is the commit point.
- synchronized(coordinatorLock) {
- // *** COMMIT POINT
- journal.sync() ;
- // *** COMMIT POINT
- // Now run the Transactions commit actions.
- commit.run() ;
- journal.truncate(0) ;
- // and tell the Transaction it's finished.
- finish.run() ;
- // Bump global serialization point if necessary.
- if ( transaction.getMode() == WRITE )
- advanceDataVersion() ;
- notifyCommitFinish(transaction) ;
- }
- }
-
- // Inside the global transaction start/commit lock.
- private void advanceDataVersion() {
- dataVersion.incrementAndGet();
- }
-
- /*package*/ void executeAbort(Transaction transaction, Runnable abort) {
- notifyAbortStart(transaction) ;
- abort.run();
- notifyAbortFinish(transaction) ;
- }
-
- // Active transactions: this is (the missing) ConcurrentHashSet
- private final static Object dummy = new Object() ;
- private ConcurrentHashMap<Transaction, Object> activeTransactions = new ConcurrentHashMap<>() ;
- private AtomicLong activeTransactionCount = new AtomicLong(0) ;
- private AtomicLong activeReadersCount = new AtomicLong(0) ;
- private AtomicLong activeWritersCount = new AtomicLong(0) ;
-
- private void startActiveTransaction(Transaction transaction) {
- synchronized(coordinatorLock) {
- // Use lock to ensure all the counters move together.
- // Thread safe - we have not let the Transaction object out yet.
- countBegin.incrementAndGet() ;
- switch(transaction.getMode()) {
- case READ: countBeginRead.incrementAndGet() ; activeReadersCount.incrementAndGet() ; break ;
- case WRITE: countBeginWrite.incrementAndGet() ; activeWritersCount.incrementAndGet() ; break ;
- }
- activeTransactionCount.incrementAndGet() ;
- activeTransactions.put(transaction, dummy) ;
- }
- }
-
- private void finishActiveTransaction(Transaction transaction) {
- synchronized(coordinatorLock) {
- // Idempotent.
- Object x = activeTransactions.remove(transaction) ;
- if ( x == null )
- return ;
- countFinished.incrementAndGet() ;
- activeTransactionCount.decrementAndGet() ;
- switch(transaction.getMode()) {
- case READ: activeReadersCount.decrementAndGet() ; break ;
- case WRITE: activeWritersCount.decrementAndGet() ; break ;
- }
- }
- exclusivitylock.readLock().unlock() ;
- }
-
- public long countActiveReaders() { return activeReadersCount.get() ; }
- public long countActiveWriter() { return activeWritersCount.get() ; }
- public long countActive() { return activeTransactionCount.get(); }
-
- // notify*Start/Finish called round each transaction lifecycle step
- // Called in cooperation between Transaction and TransactionCoordinator
- // depending on who is actually do the work of each step.
-
- /*package*/ void notifyPrepareStart(Transaction transaction) {}
-
- /*package*/ void notifyPrepareFinish(Transaction transaction) {}
-
- // Writers released here - can happen because of commit() or abort().
-
- private void notifyCommitStart(Transaction transaction) {}
-
- private void notifyCommitFinish(Transaction transaction) {
- if ( transaction.getMode() == WRITE )
- releaseWriterLock();
- }
-
- private void notifyAbortStart(Transaction transaction) { }
-
- private void notifyAbortFinish(Transaction transaction) {
- if ( transaction.getMode() == WRITE )
- releaseWriterLock();
- }
-
- /*package*/ void notifyEndStart(Transaction transaction) { }
-
- /*package*/ void notifyEndFinish(Transaction transaction) {}
-
- // Called by Transaction once at the end of first commit()/abort() or end()
-
- /*package*/ void notifyCompleteStart(Transaction transaction) { }
-
- /*package*/ void notifyCompleteFinish(Transaction transaction) { }
-
- // Coordinator state.
- private final AtomicLong countBegin = new AtomicLong(0) ;
-
- private final AtomicLong countBeginRead = new AtomicLong(0) ;
-
- private final AtomicLong countBeginWrite = new AtomicLong(0) ;
-
- private final AtomicLong countFinished = new AtomicLong(0) ;
-
- // Access counters
- public long countBegin() { return countBegin.get() ; }
-
- public long countBeginRead() { return countBeginRead.get() ; }
-
- public long countBeginWrite() { return countBeginWrite.get() ; }
-
- public long countFinished() { return countFinished.get() ; }
-}
-
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinatorState.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinatorState.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinatorState.java
deleted file mode 100644
index 7402cc1..0000000
--- a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinatorState.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.transaction.txn;
-
-import java.util.HashMap ;
-import java.util.Map ;
-
-public class TransactionCoordinatorState {
- /*package*/final Transaction transaction ;
- /*package*/Map<ComponentId, SysTransState> componentStates = new HashMap<>();
- /*package*/ TransactionCoordinatorState(Transaction transaction) {
- this.transaction = transaction ;
- }
-
- public Transaction getTransaction() {
- return transaction ;
- }
-}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionException.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionException.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionException.java
deleted file mode 100644
index f15a22b..0000000
--- a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.transaction.txn;
-
-import org.apache.jena.sparql.JenaTransactionException ;
-
-public class TransactionException extends JenaTransactionException {
- public TransactionException() { super(); }
- public TransactionException(String message) { super(message); }
- public TransactionException(Throwable cause) { super(cause) ; }
- public TransactionException(String message, Throwable cause) { super(message, cause) ; }
-
-}
-
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionInfo.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionInfo.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionInfo.java
deleted file mode 100644
index a0a9483..0000000
--- a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionInfo.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.transaction.txn;
-
-import static org.seaborne.dboe.transaction.txn.TxnState.INACTIVE ;
-import org.apache.jena.query.ReadWrite ;
-
-/**
- * A view that provides information about a transaction
- * @see Transaction
- */
-public interface TransactionInfo {
-
- /** The transaction lifecycle state */
- public TxnState getState() ;
-
- /**
- * Each transaction is allocated a serialization point by the transaction
- * coordinator. Normally, this is related to this number and it increases
- * over time as the data changes. Two readers can have the same
- * serialization point - they are working with the same view of the data.
- */
- public long getDataVersion() ;
-
- /** Has the transaction started? */
- public boolean hasStarted() ;
-
- /** Has the transaction finished (has commit/abort/end been called)? */
- public boolean hasFinished() ;
-
- /** Has the transaction gone through all lifecycle states? */
- public boolean hasFinalised() ;
-
- /** Get the trasnaction id for this transaction. Unique within this OS process (JVM) at least . */
- public TxnId getTxnId() ;
-
- /** What mode is this transaction?
- * This may change from {@code READ} to {@code WRITE} in a transactions lifetime.
- */
- public ReadWrite getMode() ;
-
- /** Is this a view of a READ transaction?
- * Convenience operation equivalent to {@code (getMode() == READ)}
- */
- public default boolean isReadTxn() { return getMode() == ReadWrite.READ ; }
-
- /** Is this a view of a WRITE transaction?
- * Convenience operation equivalent to {@code (getMode() == WRITE)}
- */
- public default boolean isWriteTxn() { return getMode() == ReadWrite.WRITE ; }
-
- /** Is this a view of a transaction that is active?
- * Equivalent to {@code getState() != INACTIVE}
- */
- public default boolean isActiveTxn() {
- return getState() != INACTIVE ;
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalBase.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalBase.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalBase.java
deleted file mode 100644
index 0e88c57..0000000
--- a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalBase.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.transaction.txn;
-
-import java.util.Objects ;
-
-import org.apache.jena.atlas.logging.Log ;
-import org.apache.jena.query.ReadWrite ;
-
-/**
- * Framework for implementing a Transactional.
- */
-
-public class TransactionalBase implements TransactionalSystem {
- // Help debugging by generating names for Transactionals.
- private final String label ;
- protected boolean isShutdown = false ;
- protected final TransactionCoordinator txnMgr ;
-
- // Per thread transaction.
- private final ThreadLocal<Transaction> theTxn = new ThreadLocal<>() ;
-
- public TransactionalBase(String label, TransactionCoordinator txnMgr) {
- this.label = label ;
- this.txnMgr = txnMgr ;
- }
-
- public TransactionalBase(TransactionCoordinator txnMgr) {
- this(null, txnMgr) ;
- }
-
- @Override
- public TransactionCoordinator getTxnMgr() {
- return txnMgr ;
- }
-
- // Development
- private static final boolean trackAttachDetach = false ;
-
- @Override
- public TransactionCoordinatorState detach() {
- if ( trackAttachDetach )
- Log.info(this, ">> detach");
- checkRunning() ;
- // Not if it just commited but before end.
- //checkActive() ;
- Transaction txn = theTxn.get() ;
- TransactionCoordinatorState coordinatorState = null ;
- if ( txn != null )
- // We are not ending.
- coordinatorState = txnMgr.detach(txn) ;
- if ( trackAttachDetach )
- Log.info(this, " theTxn = "+txn) ;
- theTxn.remove() ; ///??????
- if ( trackAttachDetach )
- Log.info(this, "<< detach");
- if ( coordinatorState == null )
- throw new TransactionException("Not attached") ;
- return coordinatorState ;
- }
-
- @Override
- public void attach(TransactionCoordinatorState coordinatorState) {
- if ( trackAttachDetach )
- Log.info(this, ">> attach");
- Objects.nonNull(coordinatorState) ;
- checkRunning() ;
- checkNotActive() ;
- TxnState txnState = coordinatorState.transaction.getState() ;
- if ( txnState != TxnState.DETACHED )
- throw new TransactionException("Not a detached transaction") ;
- txnMgr.attach(coordinatorState) ;
- if ( trackAttachDetach )
- Log.info(this, " theTxn = "+coordinatorState.transaction) ;
- theTxn.set(coordinatorState.transaction);
- if ( trackAttachDetach )
- Log.info(this, "<< attach");
- }
-
- @Override
- public final void begin(ReadWrite readWrite) {
- Objects.nonNull(readWrite) ;
- checkRunning() ;
- checkNotActive() ;
- Transaction transaction = txnMgr.begin(readWrite) ;
- theTxn.set(transaction) ;
- }
-
- @Override
- public boolean promote() {
- checkActive() ;
- Transaction txn = getValidTransaction() ;
- return txn.promote() ;
- }
-
- @Override
- public final void commit() {
- checkRunning() ;
- TransactionalSystem.super.commit() ;
- }
-
- @Override
- public void commitPrepare() {
- Transaction txn = getValidTransaction() ;
- txn.prepare() ;
- }
-
- @Override
- public void commitExec() {
- Transaction txn = getValidTransaction() ;
- txn.commit() ;
- _end() ;
- }
-
-// /** Signal end of commit phase */
-// @Override
-// public void commitEnd() {
-// _end() ;
-// }
-
- @Override
- public final void abort() {
- checkRunning() ;
- checkActive() ;
- Transaction txn = getValidTransaction() ;
- try { txn.abort() ; }
- finally { _end() ; }
- }
-
- @Override
- public final void end() {
- checkRunning() ;
- // Don't check if active or if any thread locals exist
- // because this may have already been called.
- // txn.get() ; -- may be null -- test repeat calls.
- _end() ;
- }
-
- /**
- * Return the Read/write state (or null when not in a transaction)
- */
- @Override
- final
- public ReadWrite getState() {
- checkRunning() ;
- // tricky - touching theTxn causes it to initialize.
- Transaction txn = theTxn.get() ;
- if ( txn != null )
- return txn.getMode() ;
- theTxn.remove() ;
- return null ;
- }
-
- @Override
- final
- public TransactionInfo getTransactionInfo() {
- return getThreadTransaction() ;
- }
-
- @Override
- final
- public Transaction getThreadTransaction() {
- Transaction txn = theTxn.get() ;
- // Touched the thread local so it is defined now.
-// if ( txn == null )
-// theTxn.remove() ;
- return txn ;
- }
-
- /** Get the transaction, checking there is one */
- private Transaction getValidTransaction() {
- Transaction txn = theTxn.get() ;
- if ( txn == null )
- throw new TransactionException("Not in a transaction") ;
- return txn ;
- }
-
- private void checkRunning() {
-// if ( ! hasStarted )
-// throw new TransactionException("Not started") ;
-
- if ( isShutdown )
- throw new TransactionException("Shutdown") ;
- }
-
- /**
- * Shutdown component, aborting any in-progress transactions. This operation
- * is not guaranteed to be called.
- */
- public void shutdown() {
- txnMgr.shutdown() ;
- isShutdown = true ;
- }
-
- protected String label(String msg) {
- if ( label == null )
- return msg ;
- return label+": "+msg ;
- }
-
- final
- protected void checkActive() {
- checkNotShutdown() ;
- if ( ! isInTransaction() )
- throw new TransactionException(label("Not in an active transaction")) ;
- }
-
- final
- protected void checkNotActive() {
- checkNotShutdown() ;
- if ( isInTransaction() )
- throw new TransactionException(label("Currently in an active transaction")) ;
- }
-
- final
- protected void checkNotShutdown() {
- if ( isShutdown )
- throw new TransactionException(label("Already shutdown")) ;
- }
-
- private final void _end() {
- Transaction txn = theTxn.get() ;
- if ( txn != null ) {
- try {
- // Can throw an exception on begin(W)...end().
- txn.end() ;
- } finally {
- theTxn.set(null) ;
- theTxn.remove() ;
- }
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponent.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponent.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponent.java
deleted file mode 100644
index 5057ae6..0000000
--- a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponent.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.transaction.txn;
-
-import java.nio.ByteBuffer ;
-
-/** Interface that for components of a transaction system.
-* <p><br/>
-* The {@link TransactionCoordinator} manages a number of components
-* which provide the {@link TransactionalComponent} interface.
-* <p><br/>
-* When a new coordinator starts, typically being when the in-process system starts,
-* there is a recovery phase when work from a previous coordinator is recovered.
-* Transactions were either were properly committed by the previous coordinator,
-* and hence redo actions (finalization) should be done,
-* or they were not, in which case undo actions may be needed.
-* Transctions to discard are not notified, only fully commited transaction are
-* notified during recovery. The component may need to keepit's own record of
-* undo actions needed across restarts.
-* <p><br/>
-* Lifecycle of startup:
-* <ul>
-* <li>{@link #startRecovery}
-* <li>{@link #recover} for each commited/durable transaction (redo actions)
-* <li>{@link #finishRecovery}, discarding any othe transactions (undo actions).
-* </ul>
-* <p><br/>
-* Lifecycle of a read transaction:
-* <ul>
-* <li>{@link #begin}
-* <li>{@link #complete}
-* </ul>
-* <br/>
-* A read transaction may also include {@code commit} or {@code abort} lifecycles.
-* {@link #commitPrepare} and {@link #commitEnd} are not called.
-*<p><br/>
-* Lifecycle of a write transaction:
-* <li>{@link #begin}
-* <li>{@link #commitPrepare}
-* <li>{@link #commit} or {@link #abort}
-* <li>{@link #commitEnd}
-* <li>{@link #complete} including abort
-* </ul>
-* <br/>
-* or if the application aborts the transaction:
-* <ul>
-* <li>{@link #begin}
-* <li>{@link #abort}
-* <li>{@link #complete}
-* </ul>
-* <p>
-* {@link #complete} may be called out of sequence and it forces an abort if before
-* {@link #commitPrepare}. Once {@link #commitPrepare} has been called, the component
-* can not decide whether to commit finally or to cause a system abort; it must wait
-* for the coordinator. After {@link #commitEnd}, the coordinator has definitely
-* commited the overall transaction and local prepared state can be released, and changes
-* made to the permanent state of the component.
-*
-* @see Transaction
-* @see TransactionCoordinator
-*/
-
-public interface TransactionalComponent
-{
- /**
- * Every component <i>instance</i> must supplied a unique number.
- * It is used to route journal entries to subsystems, including across restarts/recovery.
- * Uniqueness scope is within the same {@link TransactionCoordinator},
- * and the same across restarts.
- * <p>
- * If a component imposes the rule of one-per-{@link TransactionCoordinator},
- * the same number can be used (if different from all other component type instances).
- * <p>
- * If a component can have multiple instances per {@link TransactionCoordinator},
- * for example indexes, each must have a unique instance id.
- */
- public ComponentId getComponentId() ;
-
- // ---- Recovery phase
- public void startRecovery() ;
-
- /** Notification that {@code ref} was really committed and is being recovered.
- *
- * @param ref Same bytes as were written during prepare originally.
- */
- public void recover(ByteBuffer ref) ;
-
- /** End of the receovery phase */
- public void finishRecovery() ;
-
- /** Indicate that no recovery is being done (the journal thinks everything was completed last time) */
- public void cleanStart() ;
-
- // ---- Normal operation
-
- /** Start a transaction; return an identifier for this components use. */
- public void begin(Transaction transaction) ;
-
- /** Promote a component in a transaction.
- * <p>
- * May return "false" for "can't do that" if the transaction can not be promoted.
- * <p>
- * May throw {@link UnsupportedOperationException} if promotion is not supported.
- */
- public boolean promote(Transaction transaction) ;
-
- /** Prepare for a commit.
- * Returns some bytes that will be written to the journal.
- * The journal remains valid until {@link #commitEnd} is called.
- */
- public ByteBuffer commitPrepare(Transaction transaction) ;
-
- /** Commit a transaction (make durable).
- * Other components not have been commited yet and recovery may occur still.
- * Permanent state should not be finalised until {@link #commitEnd}.
- */
- public void commit(Transaction transaction) ;
-
- /** Signal all commits on all components are done (the component can clearup now) */
- public void commitEnd(Transaction transaction) ;
-
- /** Abort a transaction (undo the effect of a transaction) */
- public void abort(Transaction transaction) ;
-
- /** Finalization - the coordinator will not mention the transaction again
- * although recovery after a crash may do so.
- */
- public void complete(Transaction transaction) ;
-
- // ---- End of operations
-
- /** Detach this component from the transaction of the current thread
- * and return some internal state that can be used in a future call of
- * {@link #attach(SysTransState)}
- * <p>
- * After this call, the component is not in a transaction but the
- * existing transaction still exists. The thread may start a new
- * transaction; that transaction is completely independent of the
- * detached transaction.
- * <p>
- * Returns {@code null} if the current thread not in a transaction.
- * The component may return null to indicate it has no state.
- * The return system state should be used in a call to {@link #attach(SysTransState)}
- * and the transaction ended in the usual way.
- *
- */
- public SysTransState detach() ;
-
- /** Set the current thread to be in the transaction. The {@code systemState}
- * must be obtained from a call of {@link #detach()}.
- * This method can only be called once per {@code systemState}.
- */
- public void attach(SysTransState systemState) ;
-
- /** Shutdown component, aborting any in-progress transactions.
- * This operation is not guaranteed to be called.
- */
- public void shutdown() ;
-
-}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentBase.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentBase.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentBase.java
deleted file mode 100644
index 8dc8530..0000000
--- a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentBase.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.transaction.txn;
-
-import java.nio.ByteBuffer ;
-
-import org.apache.jena.query.ReadWrite ;
-
-/**
- * A transaction component that does nothing - can be used as a helper for
- * management tasks hooked into the transaction component lifecycle but which
- * are not stateful across restarts.
- */
-public class TransactionalComponentBase<X> extends TransactionalComponentLifecycle<X> {
-
- public TransactionalComponentBase(ComponentId id) {
- super(id) ;
- }
-
- @Override
- public void startRecovery() {}
-
- @Override
- public void recover(ByteBuffer ref) {}
-
- @Override
- public void finishRecovery() {}
-
- @Override
- public void cleanStart() {}
-
- @Override
- protected X _begin(ReadWrite readWrite, TxnId txnId) {
- return null ;
- }
-
- @Override
- protected ByteBuffer _commitPrepare(TxnId txnId, X state) {
- return null ;
- }
-
- @Override
- protected void _commit(TxnId txnId, X state) {}
-
- @Override
- protected void _commitEnd(TxnId txnId, X state) {}
-
- @Override
- protected void _abort(TxnId txnId, X state) {}
-
- @Override
- protected void _complete(TxnId txnId, X state) {}
-
- @Override
- protected void _shutdown() {}
-
- @Override
- protected X _promote(TxnId txnId, X state) { return null; }
-
-}
-
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentLifecycle.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentLifecycle.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentLifecycle.java
deleted file mode 100644
index f4aeb53..0000000
--- a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentLifecycle.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.transaction.txn;
-
-import static org.seaborne.dboe.transaction.txn.TxnState.* ;
-
-import java.nio.ByteBuffer ;
-import java.util.Objects ;
-
-import org.apache.jena.query.ReadWrite ;
-
-import org.apache.jena.atlas.lib.InternalErrorException ;
-
-/**
- * Base implementation of the component interface for {@link TransactionalComponent}.
- */
-public abstract class TransactionalComponentLifecycle<X> implements TransactionalComponent {
-
- // Pass down recovery operations.
- // This class has no transaction-recorded state.
- @Override public abstract void startRecovery();
- @Override public abstract void recover(ByteBuffer ref);
- @Override public abstract void finishRecovery();
-
- // ---- Normal operation
-
- private static final boolean CHECKING = false ;
- private ThreadLocal<TxnState> trackTxn = CHECKING ? ThreadLocal.withInitial(() -> INACTIVE) : null ;
-
- // Access to these two must be via the getter/setters below only.
- // Allows stuff for thread switching.
- private ThreadLocal<Transaction> threadTxn = new ThreadLocal<>() ;
- private ThreadLocal<X> componentState = new ThreadLocal<>() ;
- private final ComponentId componentId ;
-
- protected TransactionalComponentLifecycle(ComponentId componentId) {
- this.componentId = componentId ;
- }
-
- @Override
- public ComponentId getComponentId() {
- return componentId ;
- }
-
-// // Very dangerous!
-// protected void setForThread(Transaction txn, X state) {
-// threadTxn.set(txn);
-// componentState.set(state);
-// }
-
- /** Start a transaction */
- @Override
- final
- public void begin(Transaction transaction) {
- Objects.requireNonNull(transaction) ;
- setTransaction(transaction);
- checkState(INACTIVE, COMMITTED, ABORTED) ;
- setTrackTxn(ACTIVE);
- X x = _begin(transaction.getMode(), transaction.getTxnId()) ;
- setDataState(x);
- }
-
- /** Promote a component in a transaction */
- @Override
- final
- public boolean promote(Transaction transaction) {
- Objects.requireNonNull(transaction) ;
- checkState(ACTIVE) ;
- X newState = _promote(transaction.getTxnId(), getDataState());
- if ( newState == null )
- return false;
- setDataState(newState);
- return true;
- }
-
- /** Commit a transaction (make durable): prepare - commit - cleanup */
- @Override
- final
- public ByteBuffer commitPrepare(Transaction transaction) {
- checkAligned(transaction) ;
- checkState(ACTIVE) ;
- try { return _commitPrepare(transaction.getTxnId(), getDataState()) ; }
- finally { setTrackTxn(PREPARE) ; }
- }
-
- @Override
- final
- public void commit(Transaction transaction) {
- checkAligned(transaction) ;
- checkState(PREPARE) ;
- _commit(transaction.getTxnId(), getDataState());
- setTrackTxn(COMMIT) ;
- }
-
- @Override
- final
- public void commitEnd(Transaction transaction) {
- checkAligned(transaction) ;
- checkState(COMMIT) ;
- _commitEnd(transaction.getTxnId(), getDataState());
- setTrackTxn(COMMITTED) ;
- internalComplete(transaction) ;
- }
-
- @Override
- final
- public void abort(Transaction transaction) {
- checkAligned(transaction) ;
- checkState(ACTIVE, PREPARE, COMMIT) ;
- _abort(transaction.getTxnId(), getDataState()) ;
- setTrackTxn(ABORTED) ;
- internalComplete(transaction) ;
- }
-
- private void internalComplete(Transaction transaction) {
- _complete(transaction.getTxnId(), getDataState());
- setTrackTxn(INACTIVE) ;
- releaseThreadState() ;
- }
-
- @Override
- final
- public void complete(Transaction transaction) {
- if ( transaction.hasFinished() )
- return ;
- checkAligned(transaction) ;
- ReadWrite m = getReadWriteMode() ;
- switch(m) {
- case READ:
- checkState(ACTIVE, COMMITTED, ABORTED) ;
- break ;
- case WRITE:
- // If bad, force abort?
- checkState(COMMITTED, ABORTED) ;
- break ;
- }
- _complete(transaction.getTxnId(), getDataState());
- switch(m) {
- case READ:
- internalComplete(transaction);
- break ;
- case WRITE:
- // complete happened in the commit or abort.
- break ;
- }
- }
-
- @Override
- final
- public void shutdown() {
- _shutdown() ;
- clearInternal() ;
- }
-
- @Override
- final public SysTransState detach() {
- TxnState txnState = getTxnState() ;
- if ( txnState == null )
- return null ;
- checkState(ACTIVE) ;
- setTrackTxn(DETACHED) ;
- SysTransState transState = new SysTransState(this, getTransaction(), getDataState()) ;
- //****** Thread locals
- releaseThreadState() ;
- return transState ;
- }
-
- @Override
- public void attach(SysTransState state) {
- @SuppressWarnings("unchecked")
- X x = (X)state.getState() ;
-// // reset to not thread not in
-// if ( CHECKING )
-// trackTxn : ThreadLocal<TxnState>
-//
-//
- setTransaction(state.getTransaction());
- setDataState(x);
- setTrackTxn(ACTIVE) ;
- }
-
- // -- Access object members.
-
- public static class ComponentState<X> {
- final TxnState state;
- final Transaction txn ;
- final X componentState ;
- ComponentState(TxnState state, Transaction txn, X componentState) {
- super() ;
- this.state = state ;
- this.txn = txn ;
- this.componentState = componentState ;
- }
- }
-
- public ComponentState<X> getComponentState() {
- return new ComponentState<>(getTrackTxn(), getTransaction(), getDataState()) ;
- }
-
- public void setComponentState(ComponentState<X> state) {
- setTrackTxn(state.state);
- setTransaction(state.txn);
- setDataState(state.componentState);
-
- }
-
- protected void releaseThreadState() {
- // Remove thread locals
- if ( trackTxn != null )
- trackTxn.remove() ;
- componentState.remove();
-
-// java version "1.8.0_31"
-// Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
-// Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode)
-//
-// openjdk version "1.8.0_40-internal"
-// OpenJDK Runtime Environment (build 1.8.0_40-internal-b09)
-// OpenJDK 64-Bit Server VM (build 25.40-b13, mixed mode)
-
- // This one is very important else the memory usage grows. Not clear why.
- // A Transaction has an internal AtomicReference. Replacing the AtomicReference
- // with a plain member variable slows the growth down greatly.
- threadTxn.remove() ;
- }
-
- protected void clearInternal() {
- trackTxn = null ;
- threadTxn = null ;
- componentState = null ;
- }
-
- //protected ComponentState<X> getState()
-
- protected X getDataState() { return componentState.get() ; }
- protected void setDataState(X data) { componentState.set(data) ; }
-
- protected Transaction getTransaction() { return threadTxn.get(); }
- protected void setTransaction(Transaction txn) { threadTxn.set(txn); }
-
- private void setTrackTxn(TxnState newState) {
- if ( ! CHECKING ) return ;
- trackTxn.set(newState);
- }
-
- // This is our record of the state - it is not necessarily the transactions
- // view during changes. This class tracks the expected incomign state and
- // the transaction
- private TxnState getTrackTxn() {
- if ( ! CHECKING ) return null ;
- return trackTxn.get();
- }
- // -- Access object members.
-
- // XXX Align to javadoc in TransactionalComponent.
-
- /* There are two lifecycles, one for write transaction, one
- * for read transactions. This affects how transaction end so
- * when/if promoted read->write transactions happen, a promoted
- * transaction will follow the write lifecycle.
- *
- * In both lifecyles, the implementer can assume that calls
- * happen at the right points and called only as needed. Framework
- * takes care of checking.
- *
- * Read lifecycle:
- * A read transaction be be just begin(READ)-end() but may also
- * have commit or abort before end. The _commitRead and _abortRead
- * calls note if an explicit commit or abort occurs but may not be
- * called. _endRead is always called exactly once.
- *
- * _commitRead
- * _abortRead
- * _endRead
- * _complete
- *
- * Write lifecycle:
- * A write transaction must have a commit() or abort() before end().
- * The fraemwork will check this.
- *
- * If the transaction commits:
- * _commitPrepareWrite
- * _commitWrite -- The transaction is
- * _commitEndWrite
- *
- * If the transaction aborts:
- * _abortWrite
- *
- * After any lifecycle, a final call of
- * _complete()
- *
- * indicates ths transaction has fully finished.
- *
- * Typically, an implementation does not need to take action in every call.
- */
-
-// /**
-// *
-// * @param readWrite
-// * @param txnId
-// * @return
-// */
-// protected abstract X _begin(ReadWrite readWrite, TxnId txnId) ;
-//
-// /**
-// *
-// * @param txnId
-// * @param state
-// * @return
-// */
-// protected abstract ByteBuffer _commitPrepareWrite(TxnId txnId, X state) ;
-//
-// /**
-// *
-// * @param txnId
-// * @param state
-// */
-// protected abstract void _commitWrite(TxnId txnId, X state) ;
-//
-// /**
-// *
-// * @param txnId
-// * @param state
-// */
-// protected abstract void _commitEndWrite(TxnId txnId, X state) ;
-//
-// /**
-// *
-// * @param txnId
-// * @param state
-// */
-// protected abstract void _abortWrite(TxnId txnId, X state) ;
-//
-// /**
-// *
-// * @param txnId
-// * @param state
-// * @return
-// */
-// protected abstract ByteBuffer _commitRead(TxnId txnId, X state) ;
-//
-// /**
-// *
-// * @param txnId
-// * @param state
-// * @return
-// */
-// protected abstract ByteBuffer _abortRead(TxnId txnId, X state) ;
-//
-// /**
-// *
-// * @param txnId
-// * @param state
-// */
-// protected abstract void _complete(TxnId txnId, X state) ;
-//
-// /**
-// *
-// */
-// protected abstract void _shutdown() ;
-
- protected abstract X _begin(ReadWrite readWrite, TxnId txnId) ;
- protected abstract X _promote(TxnId txnId, X oldState) ;
- protected abstract ByteBuffer _commitPrepare(TxnId txnId, X state) ;
- protected abstract void _commit(TxnId txnId, X state) ;
- protected abstract void _commitEnd(TxnId txnId, X state) ;
- protected abstract void _abort(TxnId txnId, X state) ;
- protected abstract void _complete(TxnId txnId, X state) ;
- protected abstract void _shutdown() ;
-
- protected ReadWrite getReadWriteMode() {
- Transaction txn = getTransaction() ;
- return txn.getMode() ;
- }
-
- protected boolean isActiveTxn() {
- TxnState txnState = getTxnState() ;
- if ( txnState == null )
- return false ;
- switch(getTxnState()) {
- case INACTIVE: case END_ABORTED: case END_COMMITTED:
- return false ;
- case ACTIVE: case DETACHED: case PREPARE: case ABORTED: case COMMIT: case COMMITTED:
- return true ;
- //null: default: return false ;
- // Get the compiler to check all states covered.
- }
- // Should not happen.
- throw new InternalErrorException("Unclear transaction state") ;
- }
-
- protected boolean isReadTxn() { return ! isWriteTxn() ; }
-
- protected boolean isWriteTxn() {
- Transaction txn = getTransaction();
- return txn.isWriteTxn() ;
- }
-
- protected void checkTxn() {
- if ( ! isActiveTxn() )
- throw new TransactionException("Not in a transaction") ;
- }
-
-// protected void requireWriteTxn() {
-// Transaction txn = getTransaction();
-// if ( txn == null )
-// throw new TransactionException("Not a transaction");
-// else
-// txn.requireWriteTxn() ;
-// }
-
- protected void checkWriteTxn() {
- Transaction txn = getTransaction();
- if ( txn == null )
- throw new TransactionException("Not a transaction");
- else
- txn.requireWriteTxn() ;
- }
-
- // -- Access object members.
-
- private TxnState getTxnState() {
- Transaction txn = getTransaction() ;
- if ( txn == null )
- return null ;
- return txn.getState() ;
- }
-
- private void checkAligned(Transaction transaction) {
- if ( ! CHECKING ) return ;
- Transaction txn = getTransaction();
- if ( txn != transaction )
- throw new TransactionException("Transaction is not the transaction of the thread") ;
- }
-
- private void checkState(TxnState expected) {
- if ( ! CHECKING ) return ;
- TxnState s = getTrackTxn() ;
- if ( s != expected )
- throw new TransactionException("Transaction is in state "+s+": expected state "+expected) ;
- }
-
- private void checkState(TxnState expected1, TxnState expected2) {
- if ( ! CHECKING ) return ;
- TxnState s = getTrackTxn() ;
- if ( s != expected1 && s != expected2 )
- throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+" or "+expected2) ;
- }
-
- // Avoid varargs ... undue worry?
- private void checkState(TxnState expected1, TxnState expected2, TxnState expected3) {
- if ( ! CHECKING ) return ;
- TxnState s = getTrackTxn() ;
- if ( s != expected1 && s != expected2 && s != expected3 )
- throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+", "+expected2+" or "+expected3) ;
- }
-
- // private void checkStateNot(State unexpected) {
-// State s = state.get();
-// if ( s == unexpected )
-// throw new TransactionException("Transaction in unexpected state "+s) ;
-// }
-
-}
-
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentWrapper.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentWrapper.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentWrapper.java
deleted file mode 100644
index a07852e..0000000
--- a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentWrapper.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.transaction.txn ;
-
-import java.nio.ByteBuffer ;
-
-public class TransactionalComponentWrapper implements TransactionalComponent {
- protected final TransactionalComponent other ;
-
- public TransactionalComponentWrapper(TransactionalComponent other) {
- this.other = other ;
- }
-
- @Override
- public void startRecovery() {
- other.startRecovery() ;
- }
-
- @Override
- public void recover(ByteBuffer ref) {
- other.recover(ref) ;
- }
-
- @Override
- public void finishRecovery() {
- other.finishRecovery() ;
- }
-
- @Override
- public void cleanStart() {
- other.cleanStart() ;
- }
-
- @Override
- public ComponentId getComponentId() {
- return other.getComponentId() ;
- }
-
- @Override
- public void begin(Transaction transaction) {
- other.begin(transaction) ;
- }
-
- @Override
- public boolean promote(Transaction transaction) {
- return other.promote(transaction) ;
- }
-
- @Override
- public ByteBuffer commitPrepare(Transaction transaction) {
- return other.commitPrepare(transaction) ;
- }
-
- @Override
- public void commit(Transaction transaction) {
- other.commit(transaction) ;
- }
-
- @Override
- public void commitEnd(Transaction transaction) {
- other.commitEnd(transaction) ;
- }
-
- @Override
- public void abort(Transaction transaction) {
- other.abort(transaction) ;
- }
-
- @Override
- public void complete(Transaction transaction) {
- other.complete(transaction) ;
- }
-
- @Override
- public SysTransState detach() {
- return other.detach() ;
- }
-
- @Override
- public void attach(SysTransState systemState) {
- other.attach(systemState) ;
- }
-
- @Override
- public void shutdown() {
- other.shutdown() ;
- }
-
- @Override
- public String toString() { return "W:"+other.toString() ; }
-}
-
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalMRSW.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalMRSW.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalMRSW.java
deleted file mode 100644
index 920ee05..0000000
--- a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalMRSW.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.transaction.txn;
-
-import java.nio.ByteBuffer ;
-import java.util.concurrent.locks.Lock ;
-import java.util.concurrent.locks.ReadWriteLock ;
-import java.util.concurrent.locks.ReentrantReadWriteLock ;
-
-import org.apache.jena.atlas.logging.Log ;
-
-import org.apache.jena.query.ReadWrite ;
-
-/** Implementation of the component interface for {@link TransactionalComponent}.
- * Useful for in-memory transactions that do not provide durability or abort (undo).
- * When retro fitting to other systems, that may be the best that can be done.
- */
-public class TransactionalMRSW extends TransactionalComponentLifecycle<Object> {
- // MRSW implementation of TransactionMVCC
- // XXX Update to Jena style TransactionalLock
- private ReadWriteLock lock = new ReentrantReadWriteLock() ;
-
- public TransactionalMRSW(ComponentId componentId) {
- super(componentId) ;
- }
-
- // ---- Recovery phase
- @Override
- public void startRecovery() {}
-
- @Override
- public void recover(ByteBuffer ref) {
- Log.warn(this, "Called to recover a transaction (ignored)") ;
- }
-
- @Override
- public void finishRecovery() { }
-
- @Override
- public void cleanStart() {}
-
- private Lock getLock() {
- return ( ReadWrite.WRITE.equals(getReadWriteMode()) ) ? lock.writeLock() : lock.readLock() ;
- }
-
- @Override
- protected Object _begin(ReadWrite readWrite, TxnId thisTxnId) {
- Lock lock = getLock() ;
- // This is the point that makes this MRSW (readers OR writer), not MR+SW (readers and a writer)
- lock.lock();
- if ( isWriteTxn() )
- startWriteTxn();
- else
- startReadTxn();
- return createState();
- }
-
- private Object createState() {
- return new Object();
- }
-
- @Override
- protected Object _promote(TxnId txnId, Object state) {
- // We have a read lock, the transaction coordinator has said
- // it's OK (from it's point-of-view) to promote so this should succeed.
- // We have a read lock - theer are no other writers.
- boolean b = lock.writeLock().tryLock();
- if ( ! b ) {
- Log.warn(this, "Failed to promote");
- return false;
- }
- lock.readLock().unlock();
- return createState();
- }
-
- // Checks.
-
- protected void startReadTxn() {}
- protected void startWriteTxn() {}
- protected void finishReadTxn() {}
- protected void finishWriteTxn() {}
-
- @Override
- protected ByteBuffer _commitPrepare(TxnId txnId, Object obj) {
- return null ;
- }
-
- @Override
- protected void _commit(TxnId txnId, Object obj) {
- clearup() ;
- }
-
- @Override
- protected void _commitEnd(TxnId txnId, Object obj) {
- clearup() ;
- }
-
- @Override
- protected void _abort(TxnId txnId, Object obj) {
- clearup() ;
- }
-
- @Override
- protected void _complete(TxnId txnId, Object obj) {
- }
-
- @Override
- protected void _shutdown() {
- lock = null ;
- }
-
- private void clearup() {
- Lock lock = getLock() ;
- if ( isWriteTxn() )
- finishWriteTxn();
- else
- finishReadTxn();
- lock.unlock();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalSystem.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalSystem.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalSystem.java
deleted file mode 100644
index 9b64799..0000000
--- a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalSystem.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.transaction.txn;
-
-import org.apache.jena.query.ReadWrite ;
-import org.seaborne.dboe.transaction.Transactional ;
-
-/** Implementation side of a {@link Transactional}.
- * {@link Transactional} presents the application facing view
- * whereas this has all the possible steps of an implementation.
- * Normally, the implementation of {@link #commit} is split up.
- */
-public interface TransactionalSystem extends Transactional {
-
- @Override
- public default void commit() {
- commitPrepare() ;
- commitExec() ;
- }
-
- /** Do the 2-phase "prepare" step after which
- * the transaction coordinator decides whether to commit
- * or abort. A TransactionalSystem must be prepared for
- * both possibilities.
- */
- public void commitPrepare();
-
- /** Do the 2-phase "commit" step */
- public void commitExec();
-
- /** Suspend this transaction, detaching from the current thread.
- * A new transaction on this thread can performed but the detached
- * transaction still exists and if it is a write transaction
- * it can still block other write transactions.
- */
- public TransactionCoordinatorState detach() ;
-
- /**
- * Attach a transaction to this thread.
- * A transaction system implementation usually imposes a rule that
- * only one thread can have a transaction attached at a time.
- */
- public void attach(TransactionCoordinatorState coordinatorState) ;
-
- /** Get the associated {@link TransactionCoordinator} */
- public TransactionCoordinator getTxnMgr() ;
-
- /** Return the Read/Write state from the point of view of the caller.
- * Return null when not in a transaction.
- */
- public ReadWrite getState() ;
-
- @Override
- public default boolean isInTransaction() { return getState() != null ; }
-
- /** Return an information view of the transaction for this thread, if any.
- * Returns null when there is no active transaction for this tread.
- */
- public TransactionInfo getTransactionInfo() ;
-
- /** Return the transaction object for this thread.
- * Low-level use only.
- * To get information about the current transaction, call {@link #getTransactionInfo}.
- */
- public Transaction getThreadTransaction() ;
-}
-