You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2017/10/03 19:34:13 UTC

[27/65] [abbrv] jena git commit: JENA-1397: Rename java packages

http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/QuorumGenerator.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/QuorumGenerator.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/QuorumGenerator.java
new file mode 100644
index 0000000..d70322d
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/QuorumGenerator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import org.apache.jena.query.ReadWrite ;
+
+/** Generate  a transaction quorum for a transaction as it begins */  
+public interface QuorumGenerator {
+    public ComponentGroup genQuorum(ReadWrite mode) ;
+}
+
+

http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrBase.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrBase.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrBase.java
new file mode 100644
index 0000000..3a16534
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrBase.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.nio.ByteBuffer ;
+
+import org.apache.jena.atlas.lib.Closeable ;
+import org.apache.jena.atlas.lib.Sync ;
+import org.apache.jena.dboe.base.file.BufferChannel;
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
+
+/** Helper class for the manage the persistent state of a transactional.
+ * The persistent state is assumed to be a fixed size, or at least
+ * of known maximum size.
+ * May not be suitable for all transactional component implementations.
+ */
+
+public abstract class StateMgrBase implements Sync, Closeable {
+    /* Compare to TransBlob which is a similar idea but where it is a
+     * component in the transaction component group. That can lead to
+     * the wrong control, or at least unclear, of the writing during
+     * the transaction commit cycle.  
+     */
+    
+    private static Logger log = LoggerFactory.getLogger(StateMgrBase.class) ;
+
+    private final BufferChannel storage ;
+    // One ByteBuffer that is reused where possible.
+    // This is short-term usage 
+    // *  get disk state-> deserialize
+    // *  serialize->set disk state
+    // on a single thread (the transaction writer).
+    private ByteBuffer bb ;
+    // Is the internal state out of sync with the disk state? 
+    private boolean dirty = false ;
+    
+    protected StateMgrBase(BufferChannel storage, int sizeBytes) {
+        bb = ByteBuffer.allocate(sizeBytes) ;
+        this.storage = storage ;
+    }
+
+    /**
+     * After the default initial state is known, call this, for example, at the
+     * end of the constructor.  If no on-disk state is found, a clean copy is written.
+     */
+    protected void init() {
+        if ( ! storage.isEmpty() )
+            readState() ;
+        else
+            writeState() ;
+    }
+    
+    /* Serialize the necessary state into a ByteBuffer.
+     * The argument ByteBuffer can be used or a new one returned
+     * if it is the wrong size (e.g. too small).  The returned one will become the
+     * recycled ByteBuffer. The returned ByteBuffer should have posn/limit
+     * delimiting the necessary space to write. 
+     */
+    protected abstract ByteBuffer serialize(ByteBuffer bytes) ;
+    
+    /*
+     * Deserialize the persistent state from the ByteBuffer (delimited by posn/limit).
+     * The byte buffer will be the recycled one from last time.
+     * Most  
+     */
+    protected abstract void deserialize(ByteBuffer bytes) ;
+    
+    /** Note that the in-memory state is not known to be the same
+     * as the on-disk state.  
+     */
+    protected void setDirtyFlag() {
+        dirty = true ;
+    }
+
+    /**
+     * Event call for state writing. Called after successful 
+     * writing of the state.
+     */
+    protected abstract void writeStateEvent() ;
+    
+    /**
+     * Event call for state reading. Called after successful 
+     * deserializing of the state.
+     */
+    protected abstract void readStateEvent() ;
+    
+    /** Note that the in-memory state is the same
+     * as the on-disk state, or at least the on-disk state is
+     * acceptable for restart at any time. 
+     */
+    protected void clearDirtyFlag() {
+        dirty = false ;
+    }
+
+    /** Low level control - for example, used for cloning setup.
+     * Use with care.
+     */
+    public BufferChannel getBufferChannel() {
+        return storage ;
+    }    
+    
+    /** Return the serialized state using the internal ByteBuffer
+     * Typically called by "prepare" for the bytes to write to the journal.
+     * Calls {@link #serialize}.  
+     * This method does not perform an external I/O.
+     */
+    public ByteBuffer getState() {
+        bb.rewind() ;
+        serialize(bb) ;
+        return bb ;
+    }
+    
+    /** Set the in-memory state from a ByteBuffer, for example, from journal recovery.
+     * This method does not perform an external I/O.
+     * Call "writeState" to put the n-memory state as the disk state.
+     */
+    public void setState(ByteBuffer buff) {
+        buff.rewind() ;
+        deserialize(buff) ;
+        dirty = true ;
+    }
+
+    //public BufferChannel getChannel() { return storage ; }
+    
+    /** The write process : serialize, write, sync,
+     * After this, the bytes definitely are on disk, not in some OS cache
+     */
+    public void writeState() {
+        bb.rewind() ;
+        ByteBuffer bb1 = serialize(bb) ;
+        if ( bb1 != null )
+            bb = bb1 ;
+        bb.rewind() ;
+        int len = storage.write(bb, 0) ;
+        storage.sync() ;
+        dirty = false ;
+        writeStateEvent() ;
+    }
+
+    /** The read process : get all bytes on disk, deserialize */ 
+    public void readState() {
+        bb.rewind() ;
+        int len = storage.read(bb, 0) ;
+        bb.rewind() ;
+        deserialize(bb) ;
+        readStateEvent() ;
+    }
+
+    @Override
+    public void sync() {
+        if ( dirty )
+            writeState() ;
+    }
+
+    @Override
+    public void close() {
+        storage.close() ;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrData.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrData.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrData.java
new file mode 100644
index 0000000..3adc89b
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrData.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.nio.ByteBuffer ;
+import java.util.Arrays ;
+
+import org.apache.jena.dboe.base.file.BufferChannel;
+
+/** StateManagement for a number of long values, a common need
+ * (might as well store ints as long on disk
+ * for small numbers of integers)
+ */
+public class StateMgrData extends StateMgrBase {
+    private final long[] data ;
+    
+    public StateMgrData(BufferChannel storage, long... initialData) {
+        super(storage, numBytes(initialData)) ;
+        data = copy(initialData) ;
+        super.init() ;
+    }
+
+    @Override
+    protected void init() { throw new TransactionException("Don't call init()") ; }  
+    
+    private static long[] copy(long[] data) { return Arrays.copyOf(data, data.length) ; }
+    
+    private static int numBytes(long[] data) { return data.length * Long.BYTES ; }
+    
+    // Protected - leave whether to expose these operations as "public"
+    // to the subclass.  A subclass may choose instead to make these
+    // to more meaningful names, or ensure that daat consistentecny rules are applied.
+    
+    protected long[] getData() {
+        return copy(data) ;
+    }
+
+    protected void setData(long... newData) {
+        if ( newData.length != data.length )
+            throw new IllegalArgumentException() ; 
+        System.arraycopy(newData, 0, data, 0, data.length);
+    }
+
+    protected long get(int i) {
+        return data[i] ;
+    }
+
+    protected void set(int i, long v) {
+        data[i] = v ;
+        super.setDirtyFlag() ;
+    }
+
+    @Override
+    protected ByteBuffer serialize(ByteBuffer bytes) {
+        for ( int i = 0 ; i < data.length ; i++ )
+            bytes.putLong(data[i]) ;
+        return bytes ;
+    }
+
+    @Override
+    protected void deserialize(ByteBuffer bytes) {
+        for ( int i = 0 ; i < data.length ; i++ )
+            data[i] = bytes.getLong() ;
+    }
+
+    @Override
+    protected void writeStateEvent() {}
+
+    @Override
+    protected void readStateEvent() {}
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrDataIdx.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrDataIdx.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrDataIdx.java
new file mode 100644
index 0000000..0d208ab
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrDataIdx.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import org.apache.jena.dboe.base.file.BufferChannel;
+
+/** StateManagement for a number of long values, 
+ *  state accessed by index.
+ */
+public final class StateMgrDataIdx extends StateMgrData {
+    public StateMgrDataIdx(BufferChannel storage, long... initialData) {
+        super(storage, initialData) ;
+    }
+
+    // Expose these operations as public
+    
+    @Override
+    public long[] getData()              { return super.getData() ; }
+
+    @Override
+    public void setData(long... newData) { super.setData(newData); }
+
+    @Override
+    public long get(int i)           { return super.get(i) ; }
+
+    @Override
+    public void set(int i, long v)   { super.set(i, v); }
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTrans.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTrans.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTrans.java
new file mode 100644
index 0000000..9af4b21
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTrans.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.nio.ByteBuffer ;
+
+/** Single component aspect of a transaction */  
+final 
+class SysTrans {
+    private final TransactionalComponent elt ;
+    private final Transaction transaction ;
+    private final TxnId txnId ;
+
+    public SysTrans(TransactionalComponent elt, Transaction transaction, TxnId txnId) { 
+        this.elt = elt ;
+        this.transaction = transaction ;
+        this.txnId = txnId ;
+    }
+
+    public void begin()                 { }
+    public boolean promote()            { return elt.promote(transaction) ; }
+
+    public ByteBuffer commitPrepare()   { return elt.commitPrepare(transaction) ; }
+
+    public void commit()                { elt.commit(transaction); }
+
+    public void commitEnd()             { elt.commitEnd(transaction); }
+
+    public void abort()                 { elt.abort(transaction); }
+
+    public void complete()              { elt.complete(transaction); }
+    
+    public Transaction getTransaction()             { return transaction ; } 
+    public TxnId getTxnId()                         { return txnId ; } 
+    public TransactionalComponent getComponent()    { return elt ; }
+    public ComponentId getComponentId()             { return elt.getComponentId() ; }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTransState.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTransState.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTransState.java
new file mode 100644
index 0000000..cb6ad9a
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTransState.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+/** State for one detached components. */  
+public class SysTransState {
+    private final TransactionalComponent elt ;
+    private final Transaction transaction ;
+    private final Object state ;
+
+    public SysTransState(TransactionalComponent elt, Transaction transaction, Object state) { 
+        this.elt = elt ;
+        this.transaction = transaction ;
+        this.state = state ;
+    }
+
+    public TransactionalComponent getComponent()    { return elt ; }
+    public Transaction getTransaction()             { return transaction ; } 
+    public Object getState()                         { return state ; } 
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/Transaction.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/Transaction.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/Transaction.java
new file mode 100644
index 0000000..fe6b2a9
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/Transaction.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import static org.apache.jena.dboe.transaction.txn.TxnState.ABORTED;
+import static org.apache.jena.dboe.transaction.txn.TxnState.ACTIVE;
+import static org.apache.jena.dboe.transaction.txn.TxnState.COMMIT;
+import static org.apache.jena.dboe.transaction.txn.TxnState.COMMITTED;
+import static org.apache.jena.dboe.transaction.txn.TxnState.DETACHED;
+import static org.apache.jena.dboe.transaction.txn.TxnState.END_ABORTED;
+import static org.apache.jena.dboe.transaction.txn.TxnState.END_COMMITTED;
+import static org.apache.jena.dboe.transaction.txn.TxnState.INACTIVE;
+import static org.apache.jena.dboe.transaction.txn.TxnState.PREPARE;
+
+import java.util.List ;
+import java.util.Objects ;
+import java.util.concurrent.atomic.AtomicReference ;
+
+import org.apache.jena.query.ReadWrite ;
+
+/** 
+ * A transaction as the composition of actions on components. 
+ * Works in conjunction with the TransactionCoordinator 
+ * to provide the transaction lifecycle.
+ * @see TransactionCoordinator
+ * @see TransactionalComponent
+ */
+final
+public class Transaction implements TransactionInfo {
+    // Using an AtomicReference<TxnState> requires that 
+    // TransactionalComponentLifecycle.internalComplete
+    // frees the thread local for the threadTxn, otherwise memory
+    // usage grows. If a plain member variable is used slow growth
+    // is still seen. 
+    // Nulling txnMgr and clearing components stops that slow growth.
+
+    private TransactionCoordinator txnMgr ;
+    private final TxnId txnId ;
+    private final List<SysTrans> components ;
+    
+    // Using an AtomicReference makes this observable from the outside.
+    // It also allow for multithreaded transactions (later). 
+    private final AtomicReference<TxnState> state = new AtomicReference<>() ;
+    //private TxnState state ;
+    private long dataVersion ;
+    private ReadWrite mode ;
+    
+    public Transaction(TransactionCoordinator txnMgr, TxnId txnId, ReadWrite readWrite, long dataVersion, List<SysTrans> components) {
+        Objects.requireNonNull(txnMgr) ;
+        Objects.requireNonNull(txnId) ;
+        Objects.requireNonNull(readWrite) ;
+        Objects.requireNonNull(components) ;
+        this.txnMgr = txnMgr ;
+        this.txnId = txnId ;
+        this.mode = readWrite ;
+        this.dataVersion = dataVersion ;
+        this.components = components ;
+        setState(INACTIVE) ;
+    }
+    
+    /*package*/ void resetDataVersion(long dataVersion) {
+        this.dataVersion = dataVersion;
+    }
+
+    /*package*/ void setState(TxnState newState) {
+        state.set(newState) ;
+    }
+
+    @Override
+    public TxnState getState() {
+        return state.get() ;
+    }
+
+    /**
+     * Each transaction is allocated a serialization point by the transaction
+     * coordinator. Normally, this is related to this number and it increases
+     * over time as the data changes. Two readers can have the same
+     * serialization point - they are working with the same view of the data.
+     */
+    @Override
+    public long getDataVersion() {
+        return dataVersion ;
+    }
+
+    public void begin() {
+        checkState(INACTIVE);
+        components.forEach((c) -> c.begin()) ;
+        setState(ACTIVE) ;
+    }
+    
+    public boolean promote() {
+        checkState(ACTIVE);
+        boolean b = txnMgr.promoteTxn(this) ;
+        if ( !b )
+            return false ;
+        mode = ReadWrite.WRITE;
+        return true ;
+    }
+    
+    /*package*/ void promoteComponents() {
+        // Call back from the Transaction coordinator during promote.
+        components.forEach((c) -> {
+            if ( ! c.promote() )
+                throw new TransactionException("Failed to promote") ;
+        }) ;
+        mode = ReadWrite.WRITE ;
+    }
+    
+    public void notifyUpdate() {
+        checkState(ACTIVE) ;
+        if ( mode == ReadWrite.READ ) {
+            System.err.println("notifyUpdate - promote needed") ;
+            promote() ;
+            mode = ReadWrite.WRITE ;
+        }
+    }
+    
+    public void prepare() {
+        checkState(ACTIVE) ;
+        if ( mode == ReadWrite.WRITE ) 
+            txnMgr.executePrepare(this) ;
+        setState(PREPARE);
+    }
+    
+    public void commit() { 
+        // Split into READ and WRITE forms.
+        TxnState s = getState();
+        if ( s == ACTIVE ) 
+            // Auto exec prepare().
+            prepare() ;
+        checkState(PREPARE) ;
+        setState(COMMIT) ;
+        switch(mode) {
+            case WRITE:
+                txnMgr.executeCommit(this,
+                                     ()->{components.forEach((c) -> c.commit()) ; } ,
+                                     ()->{components.forEach((c) -> c.commitEnd()) ; } ) ;
+                break ;
+            case READ:
+                // Different lifecycle?
+                txnMgr.executeCommit(this, 
+                                     ()->{components.forEach((c) -> c.commit()) ; } ,
+                                     ()->{components.forEach((c) -> c.commitEnd()) ; } ) ;
+                break ;
+        }
+        setState(COMMITTED) ;
+    }
+    
+    public void abort() {
+        abort$();
+        endInternal() ;
+    }
+
+    private void abort$() {
+        // Split into READ and WRITE forms.
+        checkState(ACTIVE, ABORTED) ;
+        // Includes notification start/finish
+        txnMgr.executeAbort(this, ()-> { components.forEach((c) -> c.abort()) ; }) ;
+        setState(ABORTED) ;
+    }
+
+    public void end() {
+        txnMgr.notifyEndStart(this) ;
+        if ( isWriteTxn() && getState() == ACTIVE ) {
+            throw new TransactionException("Write transaction with no commit or abort") ;
+            //Log.warn(this, "Write transaction with no commit() or abort() before end()");
+            // Just the abort process.
+            //abort$() ;
+        }
+        endInternal() ;
+        txnMgr.notifyEndFinish(this) ;
+        txnMgr = null ;
+        //components.clear() ;
+    }
+    
+    private void endInternal() {  
+        if ( hasFinalised() )
+            return ;
+        // Called once, at the first abort/commit/end.
+        txnMgr.notifyCompleteStart(this);
+        components.forEach((c) -> c.complete()) ;
+        txnMgr.completed(this) ;
+        if ( getState() == COMMITTED )
+            setState(END_COMMITTED);
+        else
+            setState(END_ABORTED);
+        txnMgr.notifyCompleteFinish(this);
+    }
+    
+    /*package*/ List<SysTrans> getComponents() {
+        return components ;
+    }
+    
+    /*package*/ void detach() {
+        checkState(ACTIVE,PREPARE) ;
+        setState(DETACHED) ;
+    }
+    
+    /*package*/ void attach() {
+        checkState(DETACHED) ;
+        setState(ACTIVE) ;
+    }
+    
+    public void requireWriteTxn() {
+        checkState(ACTIVE) ;
+        if ( mode != ReadWrite.WRITE )
+            throw new TransactionException("Not a write transaction") ;
+    }
+
+    @Override
+    public boolean hasStarted()   { 
+        TxnState x = getState() ;
+        return x == INACTIVE ;
+    }
+    
+    @Override
+    public boolean hasFinished() { 
+        TxnState x = getState() ;
+        return x == COMMITTED || x == ABORTED || x == END_COMMITTED || x == END_ABORTED ;
+    }
+
+    @Override
+    public boolean hasFinalised() { 
+        TxnState x = getState() ;
+        return x == END_COMMITTED || x == END_ABORTED ;
+    }
+
+    @Override
+    public TxnId getTxnId()     { return txnId ; } 
+
+    @Override
+    public ReadWrite getMode()  { return mode ; }
+    
+    /** Is this a READ transaction?
+     * Convenience operation equivalent to {@code (getMode() == READ)}
+     */
+    @Override
+    public boolean isReadTxn()  { return mode == ReadWrite.READ ; }
+
+    /** Is this a WRITE transaction?
+     * Convenience operation equivalent to {@code (getMode() == WRITE)}
+     */
+    @Override
+    public boolean isWriteTxn()  { return mode == ReadWrite.WRITE ; }
+    
+    // hashCode/equality
+    // These must be object equality.  No two transactions objects are .equals unless they are ==   
+    
+    private void checkWriteTxn() {
+        if ( ! isActiveTxn() || ! isWriteTxn() )
+            throw new TransactionException("Not in a write transaction") ;
+    }
+
+    // XXX Duplicate -- TransactionalComponentLifecycle
+    private void checkState(TxnState expected) {
+        TxnState s = getState();
+        if ( s != expected )
+            throw new TransactionException("Transaction is in state "+s+": expected state "+expected) ;
+    }
+
+    private void checkState(TxnState expected1, TxnState expected2) {
+        TxnState s = getState();
+        if ( s != expected1 && s != expected2 )
+            throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+" or "+expected2) ;
+    }
+
+    // Avoid varargs ... undue worry?
+    private void checkState(TxnState expected1, TxnState expected2, TxnState expected3) {
+        TxnState s = getState();
+        if ( s != expected1 && s != expected2 && s != expected3 )
+            throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+", "+expected2+" or "+expected3) ;
+    }
+    
+    @Override
+    public boolean isActiveTxn() {
+        return getState() != INACTIVE ;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
new file mode 100644
index 0000000..fc47998
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import static org.apache.jena.dboe.transaction.txn.journal.JournalEntryType.UNDO;
+import static org.apache.jena.query.ReadWrite.WRITE ;
+
+import java.nio.ByteBuffer ;
+import java.util.ArrayList ;
+import java.util.Iterator ;
+import java.util.List ;
+import java.util.Objects ;
+import java.util.concurrent.ConcurrentHashMap ;
+import java.util.concurrent.Semaphore ;
+import java.util.concurrent.atomic.AtomicLong ;
+import java.util.concurrent.locks.ReadWriteLock ;
+import java.util.concurrent.locks.ReentrantReadWriteLock ;
+
+import org.apache.jena.atlas.logging.Log ;
+import org.apache.jena.dboe.base.file.Location;
+import org.apache.jena.dboe.sys.Sys;
+import org.apache.jena.dboe.transaction.txn.journal.Journal;
+import org.apache.jena.dboe.transaction.txn.journal.JournalEntry;
+import org.apache.jena.query.ReadWrite ;
+import org.slf4j.Logger ;
+
+/**
+ * One {@code TransactionCoordinator} per group of {@link TransactionalComponent}s.
+ * {@link TransactionalComponent}s can not be shared across TransactionCoordinators.
+ * <p>
+ * This is a general engine although tested and most used for multiple-reader
+ * and single-writer (MR+SW). {@link TransactionalComponentLifecycle} provides the
+ * per-threadstyle.
+ * <p>
+ * Contrast to MRSW: multiple-reader or single-writer.
+ * <h3>Block writers</h3>
+ * Block until no writers are active.
+ * When this returns, this guarantees that the database is not changing
+ * and the journal is flushed to disk.
+ * <p>
+ * See {@link #blockWriters()}, {@link #enableWriters()}, {@link #execAsWriter(Runnable)}
+ * <h3>Excluisve mode</h3>
+ * Exclusive mode is when the current thread is the only active code : no readers, no writers.
+ * <p>
+ * See {@link #startExclusiveMode()}/{@link #tryExclusiveMode()} {@link #finishExclusiveMode()}, {@link #execExclusive(Runnable)}
+ *
+ * @see Transaction
+ * @see TransactionalComponent
+ * @see TransactionalSystem
+ */
+final
+public class TransactionCoordinator {
+    private static Logger log = Sys.syslog ;
+    
+    private final Journal journal ;
+    private boolean coordinatorStarted = false ;
+
+    private final ComponentGroup components = new ComponentGroup() ;
+    // Components 
+    private ComponentGroup txnComponents = null ;
+    private List<ShutdownHook> shutdownHooks ;
+    private TxnIdGenerator txnIdGenerator = TxnIdFactory.txnIdGenSimple ;
+    
+    private QuorumGenerator quorumGenerator = null ;
+    //private QuorumGenerator quorumGenerator = (m) -> components ;
+
+    // Semaphore to implement "Single Active Writer" - independent of readers
+    // This is not reentrant.
+    private Semaphore writersWaiting = new Semaphore(1, true) ;
+    
+    // All transaction need a "read" lock through out their lifetime. 
+    // Do not confuse with read/write transactions.  We need a 
+    // "one exclusive, or many other" lock which happens to be called ReadWriteLock
+    // See also {@code lock} which protects the datastructures during transaction management.  
+    private ReadWriteLock exclusivitylock = new ReentrantReadWriteLock() ;
+
+    // Coordinator wide lock object.
+    private Object coordinatorLock = new Object() ;
+
+    @FunctionalInterface
+    public interface ShutdownHook { void shutdown() ; }
+
+    /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */ 
+    public TransactionCoordinator(Location location) {
+        this(Journal.create(location)) ;
+    }
+    
+    /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */ 
+    public TransactionCoordinator(Journal journal) {
+        this(journal, null , new ArrayList<>()) ;
+    }
+
+    /** Create a TransactionCoordinator, initially with {@link TransactionalComponent} in the ComponentGroup */
+    public TransactionCoordinator(Journal journal, List<TransactionalComponent> components) {
+        this(journal, components , new ArrayList<>()) ;
+    }
+
+    //    /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */ 
+//    public TransactionCoordinator(Location journalLocation) {
+//        this(Journal.create(journalLocation), new ArrayList<>() , new ArrayList<>()) ;
+//    }
+
+    private TransactionCoordinator(Journal journal, List<TransactionalComponent> txnComp, List<ShutdownHook> shutdownHooks) { 
+        this.journal = journal ;
+        this.shutdownHooks = new ArrayList<>(shutdownHooks) ;
+        if ( txnComp != null ) {
+            //txnComp.forEach(x-> System.out.println(x.getComponentId().label()+" :: "+Bytes.asHex(x.getComponentId().bytes()) ) ) ;
+            txnComp.forEach(components::add);
+        }
+    }
+    
+    /** Add a {@link TransactionalComponent}.
+     * Safe to call at any time but it is good practice is to add all the
+     * compoents before any transactions start.
+     * Internally, the coordinator ensures the add will safely happen but it
+     * does not add the component to existing transactions.
+     * This must be setup before recovery is attempted. 
+     */
+    public TransactionCoordinator add(TransactionalComponent elt) {
+        checkSetup() ;
+        synchronized(coordinatorLock) {
+            components.add(elt) ;
+        }
+        return this ;
+    }
+
+    /** 
+     * Remove a {@link TransactionalComponent}.
+     * @see #add 
+     */
+    public TransactionCoordinator remove(TransactionalComponent elt) {
+        checkSetup() ;
+        synchronized(coordinatorLock) {
+            components.remove(elt.getComponentId()) ;
+        }
+        return this ;
+    }
+
+    /**
+     * Add a shutdown hook. Shutdown is not guaranteed to be called
+     * and hence hooks may not get called.
+     */
+    public void add(TransactionCoordinator.ShutdownHook hook) {
+        checkSetup() ;
+        synchronized(coordinatorLock) {
+            shutdownHooks.add(hook) ;
+        }
+    }
+
+    /** Remove a shutdown hook */
+    public void remove(TransactionCoordinator.ShutdownHook hook) {
+        checkSetup() ;
+        synchronized(coordinatorLock) {
+            shutdownHooks.remove(hook) ;
+        }
+    }
+    
+    public void setQuorumGenerator(QuorumGenerator qGen) {
+        checkSetup() ;
+        this.quorumGenerator = qGen ;
+    }
+
+    public void start() {
+        checkSetup() ;
+        recovery() ;
+        coordinatorStarted = true ;
+    }
+
+    private /*public*/ void recovery() {
+        
+        Iterator<JournalEntry> iter = journal.entries() ;
+        if ( ! iter.hasNext() ) {
+            components.forEachComponent(c -> c.cleanStart()) ;
+            return ;
+        }
+        
+        log.info("Journal recovery start") ;
+        components.forEachComponent(c -> c.startRecovery()) ;
+        
+        // Group to commit
+        
+        List<JournalEntry> entries = new ArrayList<>() ;
+        
+        iter.forEachRemaining( entry -> {
+            switch(entry.getType()) {
+                case ABORT :
+                    entries.clear() ;
+                    break ;
+                case COMMIT :
+                    recover(entries) ;
+                    entries.clear() ;
+                    break ;
+                case REDO : case UNDO :
+                    entries.add(entry) ;
+                    break ;
+            }
+        }) ;
+    
+        components.forEachComponent(c -> c.finishRecovery()) ;
+        journal.reset() ;
+        log.info("Journal recovery end") ;
+    }
+
+    private void recover(List<JournalEntry> entries) {
+        entries.forEach(e -> {
+            if ( e.getType() == UNDO ) {
+                Log.warn(TransactionCoordinator.this, "UNDO entry : not handled") ;  
+                return ;
+            }
+            ComponentId cid = e.getComponentId() ;
+            ByteBuffer bb = e.getByteBuffer() ;
+            // find component.
+            TransactionalComponent c = components.findComponent(cid) ;
+            if ( c == null ) {
+                Log.warn(TransactionCoordinator.this, "No component for "+cid) ;
+                return ;
+            }
+            c.recover(bb); 
+        }) ;
+    }
+
+    public void setTxnIdGenerator(TxnIdGenerator generator) {
+        this.txnIdGenerator = generator ;
+    }
+    
+    public Journal getJournal() {
+        return journal ;
+    }
+    
+    public TransactionCoordinatorState detach(Transaction txn) {
+        txn.detach();
+        TransactionCoordinatorState coordinatorState = new TransactionCoordinatorState(txn) ;
+        components.forEach((id, c) -> {
+            SysTransState s = c.detach() ;
+            coordinatorState.componentStates.put(id, s) ;
+        } ) ;
+        // The txn still counts as "active" for tracking purposes below.
+        return coordinatorState ;
+    }
+
+    public void attach(TransactionCoordinatorState coordinatorState) {
+        Transaction txn = coordinatorState.transaction ;
+        txn.attach() ;
+        coordinatorState.componentStates.forEach((id, obj) -> {
+            components.findComponent(id).attach(obj);
+        });
+    }
+
+    public void shutdown() {
+        if ( coordinatorLock == null )
+            return ;
+        components.forEach((id, c) -> c.shutdown()) ;
+        shutdownHooks.forEach((h)-> h.shutdown()) ;
+        coordinatorLock = null ;
+        journal.close(); 
+    }
+
+    // Are we in the initialization phase?
+    private void checkSetup() {
+        if ( coordinatorStarted )
+            throw new TransactionException("TransactionCoordinator has already been started") ;
+    }
+
+    // Are we up and ruuning?
+    private void checkActive() {
+        if ( ! coordinatorStarted )
+            throw new TransactionException("TransactionCoordinator has not been started") ;
+        checkNotShutdown();
+    }
+
+    // Check not wrapped up
+    private void checkNotShutdown() {
+        if ( coordinatorLock == null )
+            throw new TransactionException("TransactionCoordinator has been shutdown") ;
+    }
+
+    private void releaseWriterLock() {
+        int x = writersWaiting.availablePermits() ;
+        if ( x != 0 )
+            throw new TransactionException("TransactionCoordinator: Probably mismatch of enable/disableWriter calls") ;
+        writersWaiting.release() ;
+    }
+    
+    /** Acquire the writer lock - return true if succeeded */
+    private boolean acquireWriterLock(boolean canBlock) {
+        if ( ! canBlock )
+            return writersWaiting.tryAcquire() ;
+        try { 
+            writersWaiting.acquire() ; 
+            return true;
+        } catch (InterruptedException e) { throw new TransactionException(e) ; }
+    }
+    
+    /** Enter exclusive mode; block if necessary.
+     * There are no active transactions on return; new transactions will be held up in 'begin'.
+     * Return to normal (release waiting transactions, allow new transactions)
+     * with {@link #finishExclusiveMode}.
+     * <p>
+     * Do not call inside an existing transaction.
+     */
+    public void startExclusiveMode() {
+        startExclusiveMode(true);
+    }
+    
+    /** Try to enter exclusive mode. 
+     *  If return is true, then there are no active transactions on return and new transactions will be held up in 'begin'.
+     *  If false, there were in-progress transactions.
+     *  Return to normal (release waiting transactions, allow new transactions)
+     *  with {@link #finishExclusiveMode}.   
+     * <p>
+     * Do not call inside an existing transaction.
+     */
+    public boolean tryExclusiveMode() {
+        return tryExclusiveMode(false);
+    }
+    
+    /** Try to enter exclusive mode.  
+     *  If return is true, then there are no active transactions on return and new transactions will be held up in 'begin'.
+     *  If false, there were in-progress transactions.
+     *  Return to normal (release waiting transactions, allow new transactions)
+     *  with {@link #finishExclusiveMode}.   
+     * <p>
+     * Do not call inside an existing transaction.
+     * @param canBlock Allow the operation block and wait for the exclusive mode lock.
+     */
+    public boolean tryExclusiveMode(boolean canBlock) {
+        return startExclusiveMode(canBlock);
+    }
+
+    private boolean startExclusiveMode(boolean canBlock) {
+        if ( canBlock ) {
+            exclusivitylock.writeLock().lock() ;
+            return true ;
+        }
+        return exclusivitylock.writeLock().tryLock() ;
+    }
+
+    /** Return to normal (release waiting transactions, allow new transactions).
+     * Must be paired with an earlier {@link #startExclusiveMode}. 
+     */
+    public void finishExclusiveMode() {
+        exclusivitylock.writeLock().unlock() ;
+    }
+
+    /** Execute an action in exclusive mode.  This method can block.
+     * Equivalent to:
+     * <pre>
+     *  startExclusiveMode() ;
+     *  try { action.run(); }
+     *  finally { finishExclusiveMode(); }
+     * </pre>
+     * 
+     * @param action
+     */
+    public void execExclusive(Runnable action) {
+        startExclusiveMode() ;
+        try { action.run(); }
+        finally { finishExclusiveMode(); }
+    }
+    
+    /** Block until no writers are active.
+     *  When this returns, this guarantees that the database is not changing
+     *  and the journal is flushed to disk.
+     * <p> 
+     * The application must call {@link #enableWriters} later.
+     * <p> 
+     * This operation must not be nested (it will block).
+     * 
+     * @see #tryBlockWriters()
+     * @see #enableWriters()
+     * 
+     */
+    public void blockWriters() {
+        acquireWriterLock(true) ;
+    }
+
+    /** Try to block all writers, or return if can't at the moment.
+     * <p>
+     * Unlike a write transction, there is no associated transaction. 
+     * <p>
+     * If it returns true, the application must call {@link #enableWriters} later.
+     *  
+     * @see #blockWriters()
+     * @see #enableWriters()
+
+     * @return true if the operation succeeded and writers are blocked 
+     */
+    public boolean tryBlockWriters() {
+        return tryBlockWriters(false) ;
+    }
+
+    /**
+     * Block until no writers are active, optionally blocking or returning if can't at the moment.
+     * <p>
+     * Unlike a write transction, there is no associated transaction. 
+     * <p>
+     * If it returns true, the application must call {@link #enableWriters} later.
+     * @param canBlock
+     * @return true if the operation succeeded and writers are blocked
+     */
+    public boolean tryBlockWriters(boolean canBlock) {
+        return acquireWriterLock(canBlock) ;
+    }
+    /** Allow writers.  
+     * This must be used in conjunction with {@link #blockWriters()} or {@link #tryBlockWriters()}
+     * 
+     * @see #blockWriters()
+     * @see #tryBlockWriters()
+     */ 
+    public void enableWriters() {
+        releaseWriterLock();
+    }
+    
+    /** Execute an action in as if a Write but no write transaction started.
+     * This method can block.
+     * <p>
+     * Equivalent to:
+     * <pre>
+     *  blockWriters() ;
+     *  try { action.run(); }
+     *  finally { enableWriters(); }
+     * </pre>
+     * 
+     * @param action
+     */
+    public void execAsWriter(Runnable action) {
+        blockWriters() ;
+        try { action.run(); }
+        finally { enableWriters(); }
+    }
+    
+    /** Start a transaction. This may block. */
+    public Transaction begin(ReadWrite readWrite) {
+        return begin(readWrite, true) ;
+    }
+    
+    /** 
+     * Start a transaction.  Returns null if this operation would block.
+     * Readers can start at any time.
+     * A single writer policy is currently imposed so a "begin(WRITE)"
+     * may block.  
+     */
+    public Transaction begin(ReadWrite readWrite, boolean canBlock) {
+        Objects.nonNull(readWrite) ;
+        checkActive() ;
+        
+        // XXX Flag to bounce writers fpor long term "block writers"
+        if ( false /* bounceWritersAtTheMoment */) {
+            if ( readWrite == WRITE ) {
+                throw new TransactionException("Writers currently being rejected");
+            }
+        }
+        
+        if ( canBlock )
+            exclusivitylock.readLock().lock() ;
+        else {
+            if ( ! exclusivitylock.readLock().tryLock() )
+                return null ;
+        }
+        
+        // Readers never block.
+        if ( readWrite == WRITE ) {
+            // Writers take a WRITE permit from the semaphore to ensure there
+            // is at most one active writer, else the attempt to start the
+            // transaction blocks.
+            // Released by in notifyCommitFinish/notifyAbortFinish
+            boolean b = acquireWriterLock(canBlock) ;
+            if ( !b ) {
+                exclusivitylock.readLock().unlock() ;
+                return null ;
+            }
+        }
+        Transaction transaction = begin$(readWrite) ;
+        startActiveTransaction(transaction) ;
+        transaction.begin();
+        return transaction;
+    }
+    
+    // The version is the serialization point for a transaction.
+    // All transactions on the same view of the data get the same serialization point.
+    
+    // A read transaction can be promoted if writer does not start
+    // This TransactionCoordinator provides Serializable, Read-lock-free
+    // execution.  With no item locking, a read can only be promoted
+    // if no writer started since the reader started.
+
+    /* The version of the data - incremented when transaction commits.
+     * This is the version with repest to the last commited transaction.
+     * Aborts do not cause the data version to advance. 
+     * This counterr never goes backwards.
+     */ 
+    private final AtomicLong dataVersion = new AtomicLong(0) ;
+    
+    private Transaction begin$(ReadWrite readWrite) {
+        synchronized(coordinatorLock) {
+            // Thread safe part of 'begin'
+            // Allocate the transaction serialization point.
+            TxnId txnId = txnIdGenerator.generate() ;
+            List<SysTrans> sysTransList = new ArrayList<>() ;
+            Transaction transaction = new Transaction(this, txnId, readWrite, dataVersion.get(), sysTransList) ;
+            
+            ComponentGroup txnComponents = chooseComponents(this.components, readWrite) ;
+            
+            try {
+                txnComponents.forEachComponent(elt -> {
+                    SysTrans sysTrans = new SysTrans(elt, transaction, txnId) ;
+                    sysTransList.add(sysTrans) ; }) ;
+                // Calling each component must be inside the lock
+                // so that a transaction does not commit overlapping with setup.
+                // If it did, different components might end up starting from
+                // different start states of the overall system.
+                txnComponents.forEachComponent(elt -> elt.begin(transaction)) ;
+            } catch(Throwable ex) {
+                // Careful about incomplete.
+                //abort() ;
+                //complete() ;
+                throw ex ;
+            }
+            return transaction ;
+        }
+    }
+    
+    private ComponentGroup chooseComponents(ComponentGroup components, ReadWrite readWrite) {
+        if ( quorumGenerator == null )
+            return components ;
+        ComponentGroup cg = quorumGenerator.genQuorum(readWrite) ;
+        if ( cg == null )
+            return components ;
+        cg.forEach((id, c) -> {
+            TransactionalComponent tcx = components.findComponent(id) ;
+            if ( ! tcx.equals(c) )
+                log.warn("TransactionalComponent not in TransactionCoordinator's ComponentGroup") ; 
+        }) ;
+        if ( log.isDebugEnabled() )
+            log.debug("Custom ComponentGroup for transaction "+readWrite+": size="+cg.size()+" of "+components.size()) ;
+        return cg ;
+    }
+
+    /** Is promotion of transactions enabled? */ 
+    /*private*/public/*for development*/ static boolean promotion               = true ;
+    
+    /** Control of whether a transaction promotion can see any commits that
+     *  happened between this transaction starting and it promoting.
+     *  A form of "ReadCommitted".   
+     */
+    /*private*/public/*for development*/ static boolean readCommittedPromotion  = false ;
+    
+    /** Whether to wait for writers when trying to promote */
+    private static final boolean promotionWaitForWriters = true;
+
+    /** Attempt to promote a tranasaction from READ to WRITE.
+     * No-op for a transaction already a writer.
+     * Throws {@link TransactionException} if the promotion
+     * can not be done.
+     */
+    /*package*/ boolean promoteTxn(Transaction transaction) {
+        if ( ! promotion )
+            return false;
+
+        if ( transaction.getMode() == WRITE )
+            return true ;
+        
+        // Has there been an writer active since the transaction started?
+        // Do a test outside the lock - only dataVaersion can change and that increases.
+        // If "read commited transactions" not allowed, the data has changed in a way we
+        // do no twish to expose.
+        // If this test fails outside the lock it will fail inside.
+        // If it passes, we have to test again in case there is an active writer.
+        
+        if ( ! readCommittedPromotion ) {
+            long txnEpoch = transaction.getDataVersion() ;      // The transaction-start point.
+            long currentEpoch = dataVersion.get() ;             // The data serialization point.
+            
+            if ( txnEpoch < currentEpoch )
+                // The data has changed and "read committed" not allowed.
+                // We can reject now.
+                return false ;
+        }
+        
+        // Once we have acquireWriterLock, we are single writer.
+        // We may have to discard writer status because eocne we can make the defintite
+        // decision on promotion, we find we can't promote after all.
+        if ( readCommittedPromotion ) {
+            /*
+             * acquireWriterLock(true) ;
+             * synchronized(coordinatorLock) {
+             * begin$ ==>
+             *    reset transaction.
+             *    promote components
+             *    reset dataVersion
+             */
+            acquireWriterLock(true) ;
+            synchronized(coordinatorLock) {
+                try { 
+                    transaction.promoteComponents() ;
+                    // Because we want to see the new state of the data.s
+                    //transaction.resetDataVersion(dataVersion.get());
+                } catch (TransactionException ex) {
+                    try { transaction.abort(); } catch(RuntimeException ex2) {}
+                    releaseWriterLock();
+                    return false ;
+                }
+            }
+            return true;
+        }
+        
+        if ( ! waitForWriters() )
+            // Failed to become a writer.
+            return false;
+        // Now a proto-writer.
+        
+        synchronized(coordinatorLock) {
+            // Not read commited.
+            // Need to check the data version once we are the writer and all previous
+            // writers have commited or aborted.
+            // Has there been an writer active since the transaction started?
+            long txnEpoch = transaction.getDataVersion() ;    // The transaction-start point.
+            long currentEpoch = dataVersion.get() ;         // The data serialization point.
+
+            if ( txnEpoch != currentEpoch ) {
+                // Failed to promote.
+                releaseWriterLock();
+                return false ;
+            }
+            
+            // ... we have now got the writer lock ...
+            try { 
+                transaction.promoteComponents() ;
+                // No need to reset the data version because strict isolation. 
+            } catch (TransactionException ex) {
+                try { transaction.abort(); } catch(RuntimeException ex2) {}
+                releaseWriterLock();
+                return false ;
+            }
+        }
+        return true ;
+    }
+        
+    private boolean waitForWriters() {
+        if ( promotionWaitForWriters )
+            return acquireWriterLock(true) ;
+        else
+            return acquireWriterLock(false) ;
+    }
+
+    // Called once by Transaction after the action of commit()/abort() or end()
+    /** Signal that the transaction has finished. */  
+    /*package*/ void completed(Transaction transaction) {
+        finishActiveTransaction(transaction);
+        journal.reset() ;
+    }
+
+    /*package*/ void executePrepare(Transaction transaction) {
+        // Do here because it needs access to the journal.
+        notifyPrepareStart(transaction);
+        transaction.getComponents().forEach(sysTrans -> {
+            TransactionalComponent c = sysTrans.getComponent() ;
+            ByteBuffer data = c.commitPrepare(transaction) ;
+            if ( data != null ) {
+                PrepareState s = new PrepareState(c.getComponentId(), data) ;
+                journal.write(s) ;
+            }
+        }) ;
+        notifyPrepareFinish(transaction);
+    }
+
+    /*package*/ void executeCommit(Transaction transaction,  Runnable commit, Runnable finish) {
+        // This is the commit point. 
+        synchronized(coordinatorLock) {
+            // *** COMMIT POINT
+            journal.sync() ;
+            // *** COMMIT POINT
+            // Now run the Transactions commit actions. 
+            commit.run() ;
+            journal.truncate(0) ;
+            // and tell the Transaction it's finished. 
+            finish.run() ;
+            // Bump global serialization point if necessary.
+            if ( transaction.getMode() == WRITE )
+                advanceDataVersion() ;
+            notifyCommitFinish(transaction) ;
+        }
+    }
+    
+    // Inside the global transaction start/commit lock.
+    private void advanceDataVersion() {
+        dataVersion.incrementAndGet();
+    }
+    
+    /*package*/ void executeAbort(Transaction transaction, Runnable abort) {
+        notifyAbortStart(transaction) ;
+        abort.run();
+        notifyAbortFinish(transaction) ;
+    }
+    
+    // Active transactions: this is (the missing) ConcurrentHashSet
+    private final static Object dummy                   = new Object() ;    
+    private ConcurrentHashMap<Transaction, Object> activeTransactions = new ConcurrentHashMap<>() ;
+    private AtomicLong activeTransactionCount = new AtomicLong(0) ;
+    private AtomicLong activeReadersCount = new AtomicLong(0) ;
+    private AtomicLong activeWritersCount = new AtomicLong(0) ;
+    
+    private void startActiveTransaction(Transaction transaction) {
+        synchronized(coordinatorLock) {
+            // Use lock to ensure all the counters move together.
+            // Thread safe - we have not let the Transaction object out yet.
+            countBegin.incrementAndGet() ;
+            switch(transaction.getMode()) {
+                case READ:  countBeginRead.incrementAndGet() ;  activeReadersCount.incrementAndGet() ; break ;
+                case WRITE: countBeginWrite.incrementAndGet() ; activeWritersCount.incrementAndGet() ; break ;
+            }
+            activeTransactionCount.incrementAndGet() ;
+            activeTransactions.put(transaction, dummy) ;
+        }
+    }
+    
+    private void finishActiveTransaction(Transaction transaction) {
+        synchronized(coordinatorLock) {
+            // Idempotent.
+            Object x = activeTransactions.remove(transaction) ;
+            if ( x == null )
+                return ;
+            countFinished.incrementAndGet() ;
+            activeTransactionCount.decrementAndGet() ;
+            switch(transaction.getMode()) {
+                case READ:  activeReadersCount.decrementAndGet() ; break ;
+                case WRITE: activeWritersCount.decrementAndGet() ; break ;
+            }
+        }
+        exclusivitylock.readLock().unlock() ; 
+    }
+    
+    public long countActiveReaders()    { return activeReadersCount.get() ; } 
+    public long countActiveWriter()     { return activeWritersCount.get() ; } 
+    public long countActive()           { return activeTransactionCount.get(); }
+    
+    // notify*Start/Finish called round each transaction lifecycle step
+    // Called in cooperation between Transaction and TransactionCoordinator
+    // depending on who is actually do the work of each step.
+
+    /*package*/ void notifyPrepareStart(Transaction transaction) {}
+
+    /*package*/ void notifyPrepareFinish(Transaction transaction) {}
+
+    // Writers released here - can happen because of commit() or abort(). 
+
+    private void notifyCommitStart(Transaction transaction) {}
+    
+    private void notifyCommitFinish(Transaction transaction) {
+        if ( transaction.getMode() == WRITE )
+            releaseWriterLock();
+    }
+    
+    private void notifyAbortStart(Transaction transaction) { }
+    
+    private void notifyAbortFinish(Transaction transaction) {
+        if ( transaction.getMode() == WRITE )
+            releaseWriterLock();
+    }
+
+    /*package*/ void notifyEndStart(Transaction transaction) { }
+
+    /*package*/ void notifyEndFinish(Transaction transaction) {}
+
+    // Called by Transaction once at the end of first commit()/abort() or end()
+    
+    /*package*/ void notifyCompleteStart(Transaction transaction) { }
+
+    /*package*/ void notifyCompleteFinish(Transaction transaction) { }
+
+    // Coordinator state.
+    private final AtomicLong countBegin         = new AtomicLong(0) ;
+
+    private final AtomicLong countBeginRead     = new AtomicLong(0) ;
+
+    private final AtomicLong countBeginWrite    = new AtomicLong(0) ;
+
+    private final AtomicLong countFinished      = new AtomicLong(0) ;
+
+    // Access counters
+    public long countBegin()        { return countBegin.get() ; }
+
+    public long countBeginRead()    { return countBeginRead.get() ; }
+
+    public long countBeginWrite()   { return countBeginWrite.get() ; }
+
+    public long countFinished()     { return countFinished.get() ; }
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinatorState.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinatorState.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinatorState.java
new file mode 100644
index 0000000..5f0755f
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinatorState.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.util.HashMap ;
+import java.util.Map ;
+
+public class TransactionCoordinatorState {
+    /*package*/final Transaction transaction ;
+    /*package*/Map<ComponentId, SysTransState> componentStates = new HashMap<>();
+    /*package*/ TransactionCoordinatorState(Transaction transaction) {
+        this.transaction = transaction ;
+    }
+    
+    public Transaction getTransaction() {
+        return transaction ;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionException.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionException.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionException.java
new file mode 100644
index 0000000..2891cf7
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import org.apache.jena.sparql.JenaTransactionException ;
+
+public class TransactionException extends JenaTransactionException {
+    public TransactionException()                                  { super(); }
+    public TransactionException(String message)                    { super(message); }
+    public TransactionException(Throwable cause)                   { super(cause) ; }
+    public TransactionException(String message, Throwable cause)   { super(message, cause) ; }
+
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionInfo.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionInfo.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionInfo.java
new file mode 100644
index 0000000..e0ab792
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionInfo.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import static org.apache.jena.dboe.transaction.txn.TxnState.INACTIVE;
+
+import org.apache.jena.query.ReadWrite ;
+
+/** 
+ * A view that provides information about a transaction
+ * @see Transaction
+ */
+public interface TransactionInfo {
+
+    /** The transaction lifecycle state */
+    public TxnState getState() ;
+
+    /**
+     * Each transaction is allocated a serialization point by the transaction
+     * coordinator. Normally, this is related to this number and it increases
+     * over time as the data changes. Two readers can have the same
+     * serialization point - they are working with the same view of the data.
+     */
+    public long getDataVersion() ;
+
+    /** Has the transaction started? */ 
+    public boolean hasStarted() ;
+    
+    /** Has the transaction finished (has commit/abort/end been called)? */ 
+    public boolean hasFinished() ; 
+
+    /** Has the transaction gone through all lifecycle states? */ 
+    public boolean hasFinalised() ; 
+
+    /** Get the trasnaction id for this transaction. Unique within this OS process (JVM) at least . */
+    public TxnId getTxnId() ; 
+
+    /** What mode is this transaction?
+     *  This may change from {@code READ} to {@code WRITE} in a transactions lifetime.  
+     */
+    public ReadWrite getMode() ;
+    
+    /** Is this a view of a READ transaction?
+     * Convenience operation equivalent to {@code (getMode() == READ)}
+     */
+    public default boolean isReadTxn()  { return getMode() == ReadWrite.READ ; }
+    
+    /** Is this a view of a WRITE transaction?
+     * Convenience operation equivalent to {@code (getMode() == WRITE)}
+     */
+    public default boolean isWriteTxn()  { return getMode() == ReadWrite.WRITE ; }
+    
+    /** Is this a view of a transaction that is active?
+     * Equivalent to {@code getState() != INACTIVE} 
+     */
+    public default boolean isActiveTxn() { 
+        return getState() != INACTIVE ;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java
new file mode 100644
index 0000000..30364d0
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.util.Objects ;
+
+import org.apache.jena.atlas.logging.Log ;
+import org.apache.jena.query.ReadWrite ;
+
+/**
+ * Framework for implementing a Transactional.
+ */
+
+public class TransactionalBase implements TransactionalSystem {
+    // Help debugging by generating names for Transactionals.
+    private final String label ; 
+    protected boolean isShutdown = false ; 
+    protected final TransactionCoordinator txnMgr ;
+    
+    // Per thread transaction.
+    private final ThreadLocal<Transaction> theTxn = new ThreadLocal<>() ;
+    
+    public TransactionalBase(String label, TransactionCoordinator txnMgr) {
+        this.label = label ;
+        this.txnMgr = txnMgr ;
+    }
+    
+    public TransactionalBase(TransactionCoordinator txnMgr) {
+        this(null, txnMgr) ;
+    }
+
+    @Override
+    public TransactionCoordinator getTxnMgr() {
+        return txnMgr ;
+    }
+
+    // Development
+    private static final boolean trackAttachDetach = false ;
+    
+    @Override
+    public TransactionCoordinatorState detach() {
+        if ( trackAttachDetach )
+            Log.info(this,  ">> detach");
+        checkRunning() ;
+        // Not if it just commited but before end.
+        //checkActive() ;
+        Transaction txn = theTxn.get() ;
+        TransactionCoordinatorState coordinatorState = null ;
+        if ( txn != null )
+            // We are not ending.
+            coordinatorState = txnMgr.detach(txn) ;
+        if ( trackAttachDetach )
+            Log.info(this,  "  theTxn = "+txn) ;
+        theTxn.remove() ; ///??????
+        if ( trackAttachDetach )
+            Log.info(this,  "<< detach");
+        if ( coordinatorState == null )
+            throw new TransactionException("Not attached") ;
+        return coordinatorState ;
+    }
+    
+    @Override
+    public void attach(TransactionCoordinatorState coordinatorState) {
+        if ( trackAttachDetach )
+            Log.info(this,  ">> attach");
+        Objects.nonNull(coordinatorState) ;
+        checkRunning() ;
+        checkNotActive() ;
+        TxnState txnState = coordinatorState.transaction.getState() ;
+        if ( txnState != TxnState.DETACHED )
+            throw new TransactionException("Not a detached transaction") ;
+        txnMgr.attach(coordinatorState) ;
+        if ( trackAttachDetach )
+            Log.info(this,  "  theTxn = "+coordinatorState.transaction) ;
+        theTxn.set(coordinatorState.transaction);
+        if ( trackAttachDetach )
+            Log.info(this,  "<< attach");
+    } 
+    
+    @Override
+    public final void begin(ReadWrite readWrite) {
+        Objects.nonNull(readWrite) ;
+        checkRunning() ;
+        checkNotActive() ;
+        Transaction transaction = txnMgr.begin(readWrite) ;
+        theTxn.set(transaction) ;
+    }
+    
+    @Override
+    public boolean promote() {
+        checkActive() ;
+        Transaction txn = getValidTransaction() ;
+        return txn.promote() ;
+    }
+
+    @Override
+    public final void commit() {
+        checkRunning() ;
+        TransactionalSystem.super.commit() ;
+    }
+
+    @Override
+    public void commitPrepare() {
+        Transaction txn = getValidTransaction() ;
+        txn.prepare() ;
+    }
+
+    @Override
+    public void commitExec() {
+        Transaction txn = getValidTransaction() ;
+        txn.commit() ;
+        _end() ;
+    }
+
+//    /** Signal end of commit phase */
+//    @Override
+//    public void commitEnd() {
+//        _end() ;
+//    }
+    
+    @Override
+    public final void abort() {
+        checkRunning() ;
+        checkActive() ;
+        Transaction txn = getValidTransaction() ;
+        try { txn.abort() ; }
+        finally { _end() ; }
+    }
+
+    @Override
+    public final void end() {
+        checkRunning() ;
+        // Don't check if active or if any thread locals exist
+        // because this may have already been called.
+        // txn.get() ; -- may be null -- test repeat calls.
+        _end() ;
+    }
+
+    /**
+     * Return the Read/write state (or null when not in a transaction)
+     */
+    @Override
+    final
+    public ReadWrite getState() {
+        checkRunning() ;
+        // tricky - touching theTxn causes it to initialize.
+        Transaction txn = theTxn.get() ;
+        if ( txn != null )
+            return txn.getMode() ;
+        theTxn.remove() ;
+        return null ; 
+    }
+    
+    @Override
+    final
+    public TransactionInfo getTransactionInfo() {
+        return getThreadTransaction() ;
+    }
+    
+    @Override
+    final
+    public Transaction getThreadTransaction() {
+        Transaction txn = theTxn.get() ;
+        // Touched the thread local so it is defined now.
+//        if ( txn == null )
+//            theTxn.remove() ;
+        return txn ;
+    }
+
+    /** Get the transaction, checking there is one */  
+    private Transaction getValidTransaction() {
+        Transaction txn = theTxn.get() ;
+        if ( txn == null )
+            throw new TransactionException("Not in a transaction") ;
+        return txn ;
+    }
+
+    private void checkRunning() {
+//        if ( ! hasStarted )
+//            throw new TransactionException("Not started") ;
+        
+        if ( isShutdown )
+            throw new TransactionException("Shutdown") ;
+    }
+    
+    /**
+     * Shutdown component, aborting any in-progress transactions. This operation
+     * is not guaranteed to be called.
+     */
+    public void shutdown() {
+        txnMgr.shutdown() ;
+        isShutdown = true ;
+    }
+
+    protected String label(String msg) {
+        if ( label == null )
+            return msg ;
+        return label+": "+msg ;
+    }
+    
+    final
+    protected void checkActive() {
+        checkNotShutdown() ;
+        if ( ! isInTransaction() )
+            throw new TransactionException(label("Not in an active transaction")) ;
+    }
+
+    final
+    protected void checkNotActive() {
+        checkNotShutdown() ;
+        if ( isInTransaction() )
+            throw new TransactionException(label("Currently in an active transaction")) ;
+    }
+
+    final
+    protected void checkNotShutdown() {
+        if ( isShutdown )
+            throw new TransactionException(label("Already shutdown")) ;
+    }
+
+    private final void _end() {
+        Transaction txn = theTxn.get() ;
+        if ( txn != null ) {
+            try {
+                // Can throw an exception on begin(W)...end().
+                txn.end() ;
+            } finally {
+                theTxn.set(null) ;
+                theTxn.remove() ;
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponent.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponent.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponent.java
new file mode 100644
index 0000000..46253fe
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponent.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.nio.ByteBuffer ;
+
+/** Interface that for components of a transaction system.
+* <p><br/>
+* The {@link TransactionCoordinator} manages a number of components
+* which provide the {@link TransactionalComponent} interface.
+* <p><br/>
+* When a new coordinator starts, typically being when the in-process system starts,
+* there is a recovery phase when work from a previous coordinator is recovered.
+* Transactions were either were properly committed by the previous coordinator,
+* and hence redo actions (finalization) should be done,
+* or they were not, in which case undo actions may be needed.
+* Transctions to discard are not notified, only fully commited transaction are
+* notified during recovery. The component may need to keepit's own record of
+* undo actions needed across restarts.
+* <p><br/>
+* Lifecycle of startup:
+* <ul>
+* <li>{@link #startRecovery}
+* <li>{@link #recover} for each commited/durable transaction (redo actions)
+* <li>{@link #finishRecovery}, discarding any othe transactions (undo actions).
+* </ul>
+* <p><br/>
+* Lifecycle of a read transaction:
+* <ul>
+* <li>{@link #begin}
+* <li>{@link #complete}
+* </ul>
+* <br/>
+* A read transaction may also include {@code commit} or {@code abort} lifecycles.
+* {@link #commitPrepare} and {@link #commitEnd} are not called.
+*<p><br/>
+* Lifecycle of a write transaction:
+* <li>{@link #begin}
+* <li>{@link #commitPrepare}
+* <li>{@link #commit} or {@link #abort}
+* <li>{@link #commitEnd}
+* <li>{@link #complete} including abort
+* </ul>
+* <br/>
+* or if the application aborts the transaction:
+* <ul>
+* <li>{@link #begin}
+* <li>{@link #abort}
+* <li>{@link #complete}
+* </ul>
+* <p>
+* {@link #complete} may be called out of sequence and it forces an abort if before 
+* {@link #commitPrepare}. Once {@link #commitPrepare} has been called, the component
+* can not decide whether to commit finally or to cause a system abort; it must wait 
+* for the coordinator. After {@link #commitEnd}, the coordinator has definitely 
+* commited the overall transaction and local prepared state can be released, and changes
+* made to the permanent state of the component.
+*
+* @see Transaction
+* @see TransactionCoordinator
+*/
+
+public interface TransactionalComponent
+{
+    /**
+     * Every component <i>instance</i> must supplied a unique number.
+     * It is used to route journal entries to subsystems, including across restarts/recovery. 
+     * Uniqueness scope is within the same {@link TransactionCoordinator},
+     * and the same across restarts.  
+     * <p>
+     * If a component imposes the rule of one-per-{@link TransactionCoordinator},
+     * the same number can be used (if different from all other component type instances).
+     * <p>
+     * If a component can have multiple instances per {@link TransactionCoordinator},
+     * for example indexes, each must have a unique instance id. 
+     */
+    public ComponentId getComponentId() ;
+
+    // ---- Recovery phase
+    public void startRecovery() ;
+    
+    /** Notification that {@code ref} was really committed and is being recovered.
+     *  
+     * @param ref Same bytes as were written during prepare originally.
+     */
+    public void recover(ByteBuffer ref) ;
+    
+    /** End of the receovery phase */
+    public void finishRecovery() ;
+
+    /** Indicate that no recovery is being done (the journal thinks everything was completed last time) */
+    public void cleanStart() ;
+
+    // ---- Normal operation
+    
+    /** Start a transaction; return an identifier for this components use. */ 
+    public void begin(Transaction transaction) ;
+    
+    /** Promote a component in a transaction.
+     * <p>
+     *  May return "false" for "can't do that" if the transaction can not be promoted.
+     *  <p>
+     *  May throw {@link UnsupportedOperationException} if promotion is not supported.
+     */
+    public boolean promote(Transaction transaction) ;
+
+    /** Prepare for a commit.
+     *  Returns some bytes that will be written to the journal.
+     *  The journal remains valid until {@link #commitEnd} is called.
+     */
+    public ByteBuffer commitPrepare(Transaction transaction) ;
+
+    /** Commit a transaction (make durable).
+     * Other components not have been commited yet and recovery may occur still.
+     * Permanent state should not be finalised until {@link #commitEnd}.
+     */
+    public void commit(Transaction transaction) ;
+    
+    /** Signal all commits on all components are done (the component can clearup now) */  
+    public void commitEnd(Transaction transaction) ;
+
+    /** Abort a transaction (undo the effect of a transaction) */   
+    public void abort(Transaction transaction) ;
+
+    /** Finalization - the coordinator will not mention the transaction again
+     *  although recovery after a crash may do so.
+     */
+    public void complete(Transaction transaction) ;
+    
+    // ---- End of operations
+    
+    /** Detach this component from the transaction of the current thread
+     * and return some internal state that can be used in a future call of 
+     * {@link #attach(SysTransState)}
+     * <p>
+     * After this call, the component is not in a transaction but the
+     * existing transaction still exists. The thread may start a new
+     * transaction; that transaction is completely independent of the
+     * detached transaction.
+     * <p>
+     * Returns {@code null} if the current thread not in a transaction.
+     * The component may return null to indicate it has no state. 
+     * The return system state should be used in a call to {@link #attach(SysTransState)}
+     * and the transaction ended in the usual way. 
+     *   
+     */
+    public SysTransState detach() ;
+    
+    /** Set the current thread to be in the transaction.  The {@code systemState}
+     * must be obtained from a call of {@link #detach()}.
+     * This method can only be called once per {@code systemState}.
+     */
+    public void attach(SysTransState systemState) ;
+    
+    /** Shutdown component, aborting any in-progress transactions.
+     * This operation is not guaranteed to be called.
+     */
+    public void shutdown() ;
+    
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentBase.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentBase.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentBase.java
new file mode 100644
index 0000000..75f1b1b
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentBase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.nio.ByteBuffer ;
+
+import org.apache.jena.query.ReadWrite ;
+
+/**
+ * A transaction component that does nothing - can be used as a helper for
+ * management tasks hooked into the transaction component lifecycle but which
+ * are not stateful across restarts.
+ */
+public class TransactionalComponentBase<X> extends TransactionalComponentLifecycle<X> {
+    
+    public TransactionalComponentBase(ComponentId id) {
+        super(id) ;
+    }
+    
+    @Override
+    public void startRecovery() {}
+
+    @Override
+    public void recover(ByteBuffer ref) {}
+
+    @Override
+    public void finishRecovery() {}
+    
+    @Override 
+    public void cleanStart() {}
+
+    @Override
+    protected X _begin(ReadWrite readWrite, TxnId txnId) {
+        return null ;
+    }
+
+    @Override
+    protected ByteBuffer _commitPrepare(TxnId txnId, X state) {
+        return null ;
+    }
+
+    @Override
+    protected void _commit(TxnId txnId, X state) {}
+
+    @Override
+    protected void _commitEnd(TxnId txnId, X state) {}
+
+    @Override
+    protected void _abort(TxnId txnId, X state) {}
+
+    @Override
+    protected void _complete(TxnId txnId, X state) {}
+
+    @Override
+    protected void _shutdown() {}
+
+    @Override
+    protected X _promote(TxnId txnId, X state) { return null; }
+
+}
+