You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2017/10/03 19:34:13 UTC
[27/65] [abbrv] jena git commit: JENA-1397: Rename java packages
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/QuorumGenerator.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/QuorumGenerator.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/QuorumGenerator.java
new file mode 100644
index 0000000..d70322d
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/QuorumGenerator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import org.apache.jena.query.ReadWrite ;
+
+/** Generate a transaction quorum for a transaction as it begins */
+public interface QuorumGenerator {
+ public ComponentGroup genQuorum(ReadWrite mode) ;
+}
+
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrBase.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrBase.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrBase.java
new file mode 100644
index 0000000..3a16534
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrBase.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.nio.ByteBuffer ;
+
+import org.apache.jena.atlas.lib.Closeable ;
+import org.apache.jena.atlas.lib.Sync ;
+import org.apache.jena.dboe.base.file.BufferChannel;
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
+
+/** Helper class for the manage the persistent state of a transactional.
+ * The persistent state is assumed to be a fixed size, or at least
+ * of known maximum size.
+ * May not be suitable for all transactional component implementations.
+ */
+
+public abstract class StateMgrBase implements Sync, Closeable {
+ /* Compare to TransBlob which is a similar idea but where it is a
+ * component in the transaction component group. That can lead to
+ * the wrong control, or at least unclear, of the writing during
+ * the transaction commit cycle.
+ */
+
+ private static Logger log = LoggerFactory.getLogger(StateMgrBase.class) ;
+
+ private final BufferChannel storage ;
+ // One ByteBuffer that is reused where possible.
+ // This is short-term usage
+ // * get disk state-> deserialize
+ // * serialize->set disk state
+ // on a single thread (the transaction writer).
+ private ByteBuffer bb ;
+ // Is the internal state out of sync with the disk state?
+ private boolean dirty = false ;
+
+ protected StateMgrBase(BufferChannel storage, int sizeBytes) {
+ bb = ByteBuffer.allocate(sizeBytes) ;
+ this.storage = storage ;
+ }
+
+ /**
+ * After the default initial state is known, call this, for example, at the
+ * end of the constructor. If no on-disk state is found, a clean copy is written.
+ */
+ protected void init() {
+ if ( ! storage.isEmpty() )
+ readState() ;
+ else
+ writeState() ;
+ }
+
+ /* Serialize the necessary state into a ByteBuffer.
+ * The argument ByteBuffer can be used or a new one returned
+ * if it is the wrong size (e.g. too small). The returned one will become the
+ * recycled ByteBuffer. The returned ByteBuffer should have posn/limit
+ * delimiting the necessary space to write.
+ */
+ protected abstract ByteBuffer serialize(ByteBuffer bytes) ;
+
+ /*
+ * Deserialize the persistent state from the ByteBuffer (delimited by posn/limit).
+ * The byte buffer will be the recycled one from last time.
+ * Most
+ */
+ protected abstract void deserialize(ByteBuffer bytes) ;
+
+ /** Note that the in-memory state is not known to be the same
+ * as the on-disk state.
+ */
+ protected void setDirtyFlag() {
+ dirty = true ;
+ }
+
+ /**
+ * Event call for state writing. Called after successful
+ * writing of the state.
+ */
+ protected abstract void writeStateEvent() ;
+
+ /**
+ * Event call for state reading. Called after successful
+ * deserializing of the state.
+ */
+ protected abstract void readStateEvent() ;
+
+ /** Note that the in-memory state is the same
+ * as the on-disk state, or at least the on-disk state is
+ * acceptable for restart at any time.
+ */
+ protected void clearDirtyFlag() {
+ dirty = false ;
+ }
+
+ /** Low level control - for example, used for cloning setup.
+ * Use with care.
+ */
+ public BufferChannel getBufferChannel() {
+ return storage ;
+ }
+
+ /** Return the serialized state using the internal ByteBuffer
+ * Typically called by "prepare" for the bytes to write to the journal.
+ * Calls {@link #serialize}.
+ * This method does not perform an external I/O.
+ */
+ public ByteBuffer getState() {
+ bb.rewind() ;
+ serialize(bb) ;
+ return bb ;
+ }
+
+ /** Set the in-memory state from a ByteBuffer, for example, from journal recovery.
+ * This method does not perform an external I/O.
+ * Call "writeState" to put the n-memory state as the disk state.
+ */
+ public void setState(ByteBuffer buff) {
+ buff.rewind() ;
+ deserialize(buff) ;
+ dirty = true ;
+ }
+
+ //public BufferChannel getChannel() { return storage ; }
+
+ /** The write process : serialize, write, sync,
+ * After this, the bytes definitely are on disk, not in some OS cache
+ */
+ public void writeState() {
+ bb.rewind() ;
+ ByteBuffer bb1 = serialize(bb) ;
+ if ( bb1 != null )
+ bb = bb1 ;
+ bb.rewind() ;
+ int len = storage.write(bb, 0) ;
+ storage.sync() ;
+ dirty = false ;
+ writeStateEvent() ;
+ }
+
+ /** The read process : get all bytes on disk, deserialize */
+ public void readState() {
+ bb.rewind() ;
+ int len = storage.read(bb, 0) ;
+ bb.rewind() ;
+ deserialize(bb) ;
+ readStateEvent() ;
+ }
+
+ @Override
+ public void sync() {
+ if ( dirty )
+ writeState() ;
+ }
+
+ @Override
+ public void close() {
+ storage.close() ;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrData.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrData.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrData.java
new file mode 100644
index 0000000..3adc89b
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrData.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.nio.ByteBuffer ;
+import java.util.Arrays ;
+
+import org.apache.jena.dboe.base.file.BufferChannel;
+
+/** StateManagement for a number of long values, a common need
+ * (might as well store ints as long on disk
+ * for small numbers of integers)
+ */
+public class StateMgrData extends StateMgrBase {
+ private final long[] data ;
+
+ public StateMgrData(BufferChannel storage, long... initialData) {
+ super(storage, numBytes(initialData)) ;
+ data = copy(initialData) ;
+ super.init() ;
+ }
+
+ @Override
+ protected void init() { throw new TransactionException("Don't call init()") ; }
+
+ private static long[] copy(long[] data) { return Arrays.copyOf(data, data.length) ; }
+
+ private static int numBytes(long[] data) { return data.length * Long.BYTES ; }
+
+ // Protected - leave whether to expose these operations as "public"
+ // to the subclass. A subclass may choose instead to make these
+ // to more meaningful names, or ensure that daat consistentecny rules are applied.
+
+ protected long[] getData() {
+ return copy(data) ;
+ }
+
+ protected void setData(long... newData) {
+ if ( newData.length != data.length )
+ throw new IllegalArgumentException() ;
+ System.arraycopy(newData, 0, data, 0, data.length);
+ }
+
+ protected long get(int i) {
+ return data[i] ;
+ }
+
+ protected void set(int i, long v) {
+ data[i] = v ;
+ super.setDirtyFlag() ;
+ }
+
+ @Override
+ protected ByteBuffer serialize(ByteBuffer bytes) {
+ for ( int i = 0 ; i < data.length ; i++ )
+ bytes.putLong(data[i]) ;
+ return bytes ;
+ }
+
+ @Override
+ protected void deserialize(ByteBuffer bytes) {
+ for ( int i = 0 ; i < data.length ; i++ )
+ data[i] = bytes.getLong() ;
+ }
+
+ @Override
+ protected void writeStateEvent() {}
+
+ @Override
+ protected void readStateEvent() {}
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrDataIdx.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrDataIdx.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrDataIdx.java
new file mode 100644
index 0000000..0d208ab
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/StateMgrDataIdx.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import org.apache.jena.dboe.base.file.BufferChannel;
+
+/** StateManagement for a number of long values,
+ * state accessed by index.
+ */
+public final class StateMgrDataIdx extends StateMgrData {
+ public StateMgrDataIdx(BufferChannel storage, long... initialData) {
+ super(storage, initialData) ;
+ }
+
+ // Expose these operations as public
+
+ @Override
+ public long[] getData() { return super.getData() ; }
+
+ @Override
+ public void setData(long... newData) { super.setData(newData); }
+
+ @Override
+ public long get(int i) { return super.get(i) ; }
+
+ @Override
+ public void set(int i, long v) { super.set(i, v); }
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTrans.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTrans.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTrans.java
new file mode 100644
index 0000000..9af4b21
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTrans.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.nio.ByteBuffer ;
+
+/** Single component aspect of a transaction */
+final
+class SysTrans {
+ private final TransactionalComponent elt ;
+ private final Transaction transaction ;
+ private final TxnId txnId ;
+
+ public SysTrans(TransactionalComponent elt, Transaction transaction, TxnId txnId) {
+ this.elt = elt ;
+ this.transaction = transaction ;
+ this.txnId = txnId ;
+ }
+
+ public void begin() { }
+ public boolean promote() { return elt.promote(transaction) ; }
+
+ public ByteBuffer commitPrepare() { return elt.commitPrepare(transaction) ; }
+
+ public void commit() { elt.commit(transaction); }
+
+ public void commitEnd() { elt.commitEnd(transaction); }
+
+ public void abort() { elt.abort(transaction); }
+
+ public void complete() { elt.complete(transaction); }
+
+ public Transaction getTransaction() { return transaction ; }
+ public TxnId getTxnId() { return txnId ; }
+ public TransactionalComponent getComponent() { return elt ; }
+ public ComponentId getComponentId() { return elt.getComponentId() ; }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTransState.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTransState.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTransState.java
new file mode 100644
index 0000000..cb6ad9a
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/SysTransState.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+/** State for one detached components. */
+public class SysTransState {
+ private final TransactionalComponent elt ;
+ private final Transaction transaction ;
+ private final Object state ;
+
+ public SysTransState(TransactionalComponent elt, Transaction transaction, Object state) {
+ this.elt = elt ;
+ this.transaction = transaction ;
+ this.state = state ;
+ }
+
+ public TransactionalComponent getComponent() { return elt ; }
+ public Transaction getTransaction() { return transaction ; }
+ public Object getState() { return state ; }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/Transaction.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/Transaction.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/Transaction.java
new file mode 100644
index 0000000..fe6b2a9
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/Transaction.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import static org.apache.jena.dboe.transaction.txn.TxnState.ABORTED;
+import static org.apache.jena.dboe.transaction.txn.TxnState.ACTIVE;
+import static org.apache.jena.dboe.transaction.txn.TxnState.COMMIT;
+import static org.apache.jena.dboe.transaction.txn.TxnState.COMMITTED;
+import static org.apache.jena.dboe.transaction.txn.TxnState.DETACHED;
+import static org.apache.jena.dboe.transaction.txn.TxnState.END_ABORTED;
+import static org.apache.jena.dboe.transaction.txn.TxnState.END_COMMITTED;
+import static org.apache.jena.dboe.transaction.txn.TxnState.INACTIVE;
+import static org.apache.jena.dboe.transaction.txn.TxnState.PREPARE;
+
+import java.util.List ;
+import java.util.Objects ;
+import java.util.concurrent.atomic.AtomicReference ;
+
+import org.apache.jena.query.ReadWrite ;
+
+/**
+ * A transaction as the composition of actions on components.
+ * Works in conjunction with the TransactionCoordinator
+ * to provide the transaction lifecycle.
+ * @see TransactionCoordinator
+ * @see TransactionalComponent
+ */
+final
+public class Transaction implements TransactionInfo {
+ // Using an AtomicReference<TxnState> requires that
+ // TransactionalComponentLifecycle.internalComplete
+ // frees the thread local for the threadTxn, otherwise memory
+ // usage grows. If a plain member variable is used slow growth
+ // is still seen.
+ // Nulling txnMgr and clearing components stops that slow growth.
+
+ private TransactionCoordinator txnMgr ;
+ private final TxnId txnId ;
+ private final List<SysTrans> components ;
+
+ // Using an AtomicReference makes this observable from the outside.
+ // It also allow for multithreaded transactions (later).
+ private final AtomicReference<TxnState> state = new AtomicReference<>() ;
+ //private TxnState state ;
+ private long dataVersion ;
+ private ReadWrite mode ;
+
+ public Transaction(TransactionCoordinator txnMgr, TxnId txnId, ReadWrite readWrite, long dataVersion, List<SysTrans> components) {
+ Objects.requireNonNull(txnMgr) ;
+ Objects.requireNonNull(txnId) ;
+ Objects.requireNonNull(readWrite) ;
+ Objects.requireNonNull(components) ;
+ this.txnMgr = txnMgr ;
+ this.txnId = txnId ;
+ this.mode = readWrite ;
+ this.dataVersion = dataVersion ;
+ this.components = components ;
+ setState(INACTIVE) ;
+ }
+
+ /*package*/ void resetDataVersion(long dataVersion) {
+ this.dataVersion = dataVersion;
+ }
+
+ /*package*/ void setState(TxnState newState) {
+ state.set(newState) ;
+ }
+
+ @Override
+ public TxnState getState() {
+ return state.get() ;
+ }
+
+ /**
+ * Each transaction is allocated a serialization point by the transaction
+ * coordinator. Normally, this is related to this number and it increases
+ * over time as the data changes. Two readers can have the same
+ * serialization point - they are working with the same view of the data.
+ */
+ @Override
+ public long getDataVersion() {
+ return dataVersion ;
+ }
+
+ public void begin() {
+ checkState(INACTIVE);
+ components.forEach((c) -> c.begin()) ;
+ setState(ACTIVE) ;
+ }
+
+ public boolean promote() {
+ checkState(ACTIVE);
+ boolean b = txnMgr.promoteTxn(this) ;
+ if ( !b )
+ return false ;
+ mode = ReadWrite.WRITE;
+ return true ;
+ }
+
+ /*package*/ void promoteComponents() {
+ // Call back from the Transaction coordinator during promote.
+ components.forEach((c) -> {
+ if ( ! c.promote() )
+ throw new TransactionException("Failed to promote") ;
+ }) ;
+ mode = ReadWrite.WRITE ;
+ }
+
+ public void notifyUpdate() {
+ checkState(ACTIVE) ;
+ if ( mode == ReadWrite.READ ) {
+ System.err.println("notifyUpdate - promote needed") ;
+ promote() ;
+ mode = ReadWrite.WRITE ;
+ }
+ }
+
+ public void prepare() {
+ checkState(ACTIVE) ;
+ if ( mode == ReadWrite.WRITE )
+ txnMgr.executePrepare(this) ;
+ setState(PREPARE);
+ }
+
+ public void commit() {
+ // Split into READ and WRITE forms.
+ TxnState s = getState();
+ if ( s == ACTIVE )
+ // Auto exec prepare().
+ prepare() ;
+ checkState(PREPARE) ;
+ setState(COMMIT) ;
+ switch(mode) {
+ case WRITE:
+ txnMgr.executeCommit(this,
+ ()->{components.forEach((c) -> c.commit()) ; } ,
+ ()->{components.forEach((c) -> c.commitEnd()) ; } ) ;
+ break ;
+ case READ:
+ // Different lifecycle?
+ txnMgr.executeCommit(this,
+ ()->{components.forEach((c) -> c.commit()) ; } ,
+ ()->{components.forEach((c) -> c.commitEnd()) ; } ) ;
+ break ;
+ }
+ setState(COMMITTED) ;
+ }
+
+ public void abort() {
+ abort$();
+ endInternal() ;
+ }
+
+ private void abort$() {
+ // Split into READ and WRITE forms.
+ checkState(ACTIVE, ABORTED) ;
+ // Includes notification start/finish
+ txnMgr.executeAbort(this, ()-> { components.forEach((c) -> c.abort()) ; }) ;
+ setState(ABORTED) ;
+ }
+
+ public void end() {
+ txnMgr.notifyEndStart(this) ;
+ if ( isWriteTxn() && getState() == ACTIVE ) {
+ throw new TransactionException("Write transaction with no commit or abort") ;
+ //Log.warn(this, "Write transaction with no commit() or abort() before end()");
+ // Just the abort process.
+ //abort$() ;
+ }
+ endInternal() ;
+ txnMgr.notifyEndFinish(this) ;
+ txnMgr = null ;
+ //components.clear() ;
+ }
+
+ private void endInternal() {
+ if ( hasFinalised() )
+ return ;
+ // Called once, at the first abort/commit/end.
+ txnMgr.notifyCompleteStart(this);
+ components.forEach((c) -> c.complete()) ;
+ txnMgr.completed(this) ;
+ if ( getState() == COMMITTED )
+ setState(END_COMMITTED);
+ else
+ setState(END_ABORTED);
+ txnMgr.notifyCompleteFinish(this);
+ }
+
+ /*package*/ List<SysTrans> getComponents() {
+ return components ;
+ }
+
+ /*package*/ void detach() {
+ checkState(ACTIVE,PREPARE) ;
+ setState(DETACHED) ;
+ }
+
+ /*package*/ void attach() {
+ checkState(DETACHED) ;
+ setState(ACTIVE) ;
+ }
+
+ public void requireWriteTxn() {
+ checkState(ACTIVE) ;
+ if ( mode != ReadWrite.WRITE )
+ throw new TransactionException("Not a write transaction") ;
+ }
+
+ @Override
+ public boolean hasStarted() {
+ TxnState x = getState() ;
+ return x == INACTIVE ;
+ }
+
+ @Override
+ public boolean hasFinished() {
+ TxnState x = getState() ;
+ return x == COMMITTED || x == ABORTED || x == END_COMMITTED || x == END_ABORTED ;
+ }
+
+ @Override
+ public boolean hasFinalised() {
+ TxnState x = getState() ;
+ return x == END_COMMITTED || x == END_ABORTED ;
+ }
+
+ @Override
+ public TxnId getTxnId() { return txnId ; }
+
+ @Override
+ public ReadWrite getMode() { return mode ; }
+
+ /** Is this a READ transaction?
+ * Convenience operation equivalent to {@code (getMode() == READ)}
+ */
+ @Override
+ public boolean isReadTxn() { return mode == ReadWrite.READ ; }
+
+ /** Is this a WRITE transaction?
+ * Convenience operation equivalent to {@code (getMode() == WRITE)}
+ */
+ @Override
+ public boolean isWriteTxn() { return mode == ReadWrite.WRITE ; }
+
+ // hashCode/equality
+ // These must be object equality. No two transactions objects are .equals unless they are ==
+
+ private void checkWriteTxn() {
+ if ( ! isActiveTxn() || ! isWriteTxn() )
+ throw new TransactionException("Not in a write transaction") ;
+ }
+
+ // XXX Duplicate -- TransactionalComponentLifecycle
+ private void checkState(TxnState expected) {
+ TxnState s = getState();
+ if ( s != expected )
+ throw new TransactionException("Transaction is in state "+s+": expected state "+expected) ;
+ }
+
+ private void checkState(TxnState expected1, TxnState expected2) {
+ TxnState s = getState();
+ if ( s != expected1 && s != expected2 )
+ throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+" or "+expected2) ;
+ }
+
+ // Avoid varargs ... undue worry?
+ private void checkState(TxnState expected1, TxnState expected2, TxnState expected3) {
+ TxnState s = getState();
+ if ( s != expected1 && s != expected2 && s != expected3 )
+ throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+", "+expected2+" or "+expected3) ;
+ }
+
+ @Override
+ public boolean isActiveTxn() {
+ return getState() != INACTIVE ;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
new file mode 100644
index 0000000..fc47998
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import static org.apache.jena.dboe.transaction.txn.journal.JournalEntryType.UNDO;
+import static org.apache.jena.query.ReadWrite.WRITE ;
+
+import java.nio.ByteBuffer ;
+import java.util.ArrayList ;
+import java.util.Iterator ;
+import java.util.List ;
+import java.util.Objects ;
+import java.util.concurrent.ConcurrentHashMap ;
+import java.util.concurrent.Semaphore ;
+import java.util.concurrent.atomic.AtomicLong ;
+import java.util.concurrent.locks.ReadWriteLock ;
+import java.util.concurrent.locks.ReentrantReadWriteLock ;
+
+import org.apache.jena.atlas.logging.Log ;
+import org.apache.jena.dboe.base.file.Location;
+import org.apache.jena.dboe.sys.Sys;
+import org.apache.jena.dboe.transaction.txn.journal.Journal;
+import org.apache.jena.dboe.transaction.txn.journal.JournalEntry;
+import org.apache.jena.query.ReadWrite ;
+import org.slf4j.Logger ;
+
+/**
+ * One {@code TransactionCoordinator} per group of {@link TransactionalComponent}s.
+ * {@link TransactionalComponent}s can not be shared across TransactionCoordinators.
+ * <p>
+ * This is a general engine although tested and most used for multiple-reader
+ * and single-writer (MR+SW). {@link TransactionalComponentLifecycle} provides the
+ * per-threadstyle.
+ * <p>
+ * Contrast to MRSW: multiple-reader or single-writer.
+ * <h3>Block writers</h3>
+ * Block until no writers are active.
+ * When this returns, this guarantees that the database is not changing
+ * and the journal is flushed to disk.
+ * <p>
+ * See {@link #blockWriters()}, {@link #enableWriters()}, {@link #execAsWriter(Runnable)}
+ * <h3>Excluisve mode</h3>
+ * Exclusive mode is when the current thread is the only active code : no readers, no writers.
+ * <p>
+ * See {@link #startExclusiveMode()}/{@link #tryExclusiveMode()} {@link #finishExclusiveMode()}, {@link #execExclusive(Runnable)}
+ *
+ * @see Transaction
+ * @see TransactionalComponent
+ * @see TransactionalSystem
+ */
+final
+public class TransactionCoordinator {
+ private static Logger log = Sys.syslog ;
+
+ private final Journal journal ;
+ private boolean coordinatorStarted = false ;
+
+ private final ComponentGroup components = new ComponentGroup() ;
+ // Components
+ private ComponentGroup txnComponents = null ;
+ private List<ShutdownHook> shutdownHooks ;
+ private TxnIdGenerator txnIdGenerator = TxnIdFactory.txnIdGenSimple ;
+
+ private QuorumGenerator quorumGenerator = null ;
+ //private QuorumGenerator quorumGenerator = (m) -> components ;
+
+ // Semaphore to implement "Single Active Writer" - independent of readers
+ // This is not reentrant.
+ private Semaphore writersWaiting = new Semaphore(1, true) ;
+
+ // All transaction need a "read" lock through out their lifetime.
+ // Do not confuse with read/write transactions. We need a
+ // "one exclusive, or many other" lock which happens to be called ReadWriteLock
+ // See also {@code lock} which protects the datastructures during transaction management.
+ private ReadWriteLock exclusivitylock = new ReentrantReadWriteLock() ;
+
+ // Coordinator wide lock object.
+ private Object coordinatorLock = new Object() ;
+
+ @FunctionalInterface
+ public interface ShutdownHook { void shutdown() ; }
+
+ /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */
+ public TransactionCoordinator(Location location) {
+ this(Journal.create(location)) ;
+ }
+
+ /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */
+ public TransactionCoordinator(Journal journal) {
+ this(journal, null , new ArrayList<>()) ;
+ }
+
+ /** Create a TransactionCoordinator, initially with {@link TransactionalComponent} in the ComponentGroup */
+ public TransactionCoordinator(Journal journal, List<TransactionalComponent> components) {
+ this(journal, components , new ArrayList<>()) ;
+ }
+
+ // /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */
+// public TransactionCoordinator(Location journalLocation) {
+// this(Journal.create(journalLocation), new ArrayList<>() , new ArrayList<>()) ;
+// }
+
+ private TransactionCoordinator(Journal journal, List<TransactionalComponent> txnComp, List<ShutdownHook> shutdownHooks) {
+ this.journal = journal ;
+ this.shutdownHooks = new ArrayList<>(shutdownHooks) ;
+ if ( txnComp != null ) {
+ //txnComp.forEach(x-> System.out.println(x.getComponentId().label()+" :: "+Bytes.asHex(x.getComponentId().bytes()) ) ) ;
+ txnComp.forEach(components::add);
+ }
+ }
+
+ /** Add a {@link TransactionalComponent}.
+ * Safe to call at any time but it is good practice is to add all the
+ * compoents before any transactions start.
+ * Internally, the coordinator ensures the add will safely happen but it
+ * does not add the component to existing transactions.
+ * This must be setup before recovery is attempted.
+ */
+ public TransactionCoordinator add(TransactionalComponent elt) {
+ checkSetup() ;
+ synchronized(coordinatorLock) {
+ components.add(elt) ;
+ }
+ return this ;
+ }
+
+ /**
+ * Remove a {@link TransactionalComponent}.
+ * @see #add
+ */
+ public TransactionCoordinator remove(TransactionalComponent elt) {
+ checkSetup() ;
+ synchronized(coordinatorLock) {
+ components.remove(elt.getComponentId()) ;
+ }
+ return this ;
+ }
+
+ /**
+ * Add a shutdown hook. Shutdown is not guaranteed to be called
+ * and hence hooks may not get called.
+ */
+ public void add(TransactionCoordinator.ShutdownHook hook) {
+ checkSetup() ;
+ synchronized(coordinatorLock) {
+ shutdownHooks.add(hook) ;
+ }
+ }
+
+ /** Remove a shutdown hook */
+ public void remove(TransactionCoordinator.ShutdownHook hook) {
+ checkSetup() ;
+ synchronized(coordinatorLock) {
+ shutdownHooks.remove(hook) ;
+ }
+ }
+
+ public void setQuorumGenerator(QuorumGenerator qGen) {
+ checkSetup() ;
+ this.quorumGenerator = qGen ;
+ }
+
+ public void start() {
+ checkSetup() ;
+ recovery() ;
+ coordinatorStarted = true ;
+ }
+
+ private /*public*/ void recovery() {
+
+ Iterator<JournalEntry> iter = journal.entries() ;
+ if ( ! iter.hasNext() ) {
+ components.forEachComponent(c -> c.cleanStart()) ;
+ return ;
+ }
+
+ log.info("Journal recovery start") ;
+ components.forEachComponent(c -> c.startRecovery()) ;
+
+ // Group to commit
+
+ List<JournalEntry> entries = new ArrayList<>() ;
+
+ iter.forEachRemaining( entry -> {
+ switch(entry.getType()) {
+ case ABORT :
+ entries.clear() ;
+ break ;
+ case COMMIT :
+ recover(entries) ;
+ entries.clear() ;
+ break ;
+ case REDO : case UNDO :
+ entries.add(entry) ;
+ break ;
+ }
+ }) ;
+
+ components.forEachComponent(c -> c.finishRecovery()) ;
+ journal.reset() ;
+ log.info("Journal recovery end") ;
+ }
+
+ private void recover(List<JournalEntry> entries) {
+ entries.forEach(e -> {
+ if ( e.getType() == UNDO ) {
+ Log.warn(TransactionCoordinator.this, "UNDO entry : not handled") ;
+ return ;
+ }
+ ComponentId cid = e.getComponentId() ;
+ ByteBuffer bb = e.getByteBuffer() ;
+ // find component.
+ TransactionalComponent c = components.findComponent(cid) ;
+ if ( c == null ) {
+ Log.warn(TransactionCoordinator.this, "No component for "+cid) ;
+ return ;
+ }
+ c.recover(bb);
+ }) ;
+ }
+
+ public void setTxnIdGenerator(TxnIdGenerator generator) {
+ this.txnIdGenerator = generator ;
+ }
+
+ public Journal getJournal() {
+ return journal ;
+ }
+
+ public TransactionCoordinatorState detach(Transaction txn) {
+ txn.detach();
+ TransactionCoordinatorState coordinatorState = new TransactionCoordinatorState(txn) ;
+ components.forEach((id, c) -> {
+ SysTransState s = c.detach() ;
+ coordinatorState.componentStates.put(id, s) ;
+ } ) ;
+ // The txn still counts as "active" for tracking purposes below.
+ return coordinatorState ;
+ }
+
+ public void attach(TransactionCoordinatorState coordinatorState) {
+ Transaction txn = coordinatorState.transaction ;
+ txn.attach() ;
+ coordinatorState.componentStates.forEach((id, obj) -> {
+ components.findComponent(id).attach(obj);
+ });
+ }
+
+ public void shutdown() {
+ if ( coordinatorLock == null )
+ return ;
+ components.forEach((id, c) -> c.shutdown()) ;
+ shutdownHooks.forEach((h)-> h.shutdown()) ;
+ coordinatorLock = null ;
+ journal.close();
+ }
+
+ // Are we in the initialization phase?
+ private void checkSetup() {
+ if ( coordinatorStarted )
+ throw new TransactionException("TransactionCoordinator has already been started") ;
+ }
+
+ // Are we up and ruuning?
+ private void checkActive() {
+ if ( ! coordinatorStarted )
+ throw new TransactionException("TransactionCoordinator has not been started") ;
+ checkNotShutdown();
+ }
+
+ // Check not wrapped up
+ private void checkNotShutdown() {
+ if ( coordinatorLock == null )
+ throw new TransactionException("TransactionCoordinator has been shutdown") ;
+ }
+
+ private void releaseWriterLock() {
+ int x = writersWaiting.availablePermits() ;
+ if ( x != 0 )
+ throw new TransactionException("TransactionCoordinator: Probably mismatch of enable/disableWriter calls") ;
+ writersWaiting.release() ;
+ }
+
+ /** Acquire the writer lock - return true if succeeded */
+ private boolean acquireWriterLock(boolean canBlock) {
+ if ( ! canBlock )
+ return writersWaiting.tryAcquire() ;
+ try {
+ writersWaiting.acquire() ;
+ return true;
+ } catch (InterruptedException e) { throw new TransactionException(e) ; }
+ }
+
+ /** Enter exclusive mode; block if necessary.
+ * There are no active transactions on return; new transactions will be held up in 'begin'.
+ * Return to normal (release waiting transactions, allow new transactions)
+ * with {@link #finishExclusiveMode}.
+ * <p>
+ * Do not call inside an existing transaction.
+ */
+ public void startExclusiveMode() {
+ startExclusiveMode(true);
+ }
+
+ /** Try to enter exclusive mode.
+ * If return is true, then there are no active transactions on return and new transactions will be held up in 'begin'.
+ * If false, there were in-progress transactions.
+ * Return to normal (release waiting transactions, allow new transactions)
+ * with {@link #finishExclusiveMode}.
+ * <p>
+ * Do not call inside an existing transaction.
+ */
+ public boolean tryExclusiveMode() {
+ return tryExclusiveMode(false);
+ }
+
+ /** Try to enter exclusive mode.
+ * If return is true, then there are no active transactions on return and new transactions will be held up in 'begin'.
+ * If false, there were in-progress transactions.
+ * Return to normal (release waiting transactions, allow new transactions)
+ * with {@link #finishExclusiveMode}.
+ * <p>
+ * Do not call inside an existing transaction.
+ * @param canBlock Allow the operation block and wait for the exclusive mode lock.
+ */
+ public boolean tryExclusiveMode(boolean canBlock) {
+ return startExclusiveMode(canBlock);
+ }
+
+ private boolean startExclusiveMode(boolean canBlock) {
+ if ( canBlock ) {
+ exclusivitylock.writeLock().lock() ;
+ return true ;
+ }
+ return exclusivitylock.writeLock().tryLock() ;
+ }
+
+ /** Return to normal (release waiting transactions, allow new transactions).
+ * Must be paired with an earlier {@link #startExclusiveMode}.
+ */
+ public void finishExclusiveMode() {
+ exclusivitylock.writeLock().unlock() ;
+ }
+
+ /** Execute an action in exclusive mode. This method can block.
+ * Equivalent to:
+ * <pre>
+ * startExclusiveMode() ;
+ * try { action.run(); }
+ * finally { finishExclusiveMode(); }
+ * </pre>
+ *
+ * @param action
+ */
+ public void execExclusive(Runnable action) {
+ startExclusiveMode() ;
+ try { action.run(); }
+ finally { finishExclusiveMode(); }
+ }
+
+ /** Block until no writers are active.
+ * When this returns, this guarantees that the database is not changing
+ * and the journal is flushed to disk.
+ * <p>
+ * The application must call {@link #enableWriters} later.
+ * <p>
+ * This operation must not be nested (it will block).
+ *
+ * @see #tryBlockWriters()
+ * @see #enableWriters()
+ *
+ */
+ public void blockWriters() {
+ acquireWriterLock(true) ;
+ }
+
+ /** Try to block all writers, or return if can't at the moment.
+ * <p>
+ * Unlike a write transction, there is no associated transaction.
+ * <p>
+ * If it returns true, the application must call {@link #enableWriters} later.
+ *
+ * @see #blockWriters()
+ * @see #enableWriters()
+
+ * @return true if the operation succeeded and writers are blocked
+ */
+ public boolean tryBlockWriters() {
+ return tryBlockWriters(false) ;
+ }
+
+ /**
+ * Block until no writers are active, optionally blocking or returning if can't at the moment.
+ * <p>
+ * Unlike a write transction, there is no associated transaction.
+ * <p>
+ * If it returns true, the application must call {@link #enableWriters} later.
+ * @param canBlock
+ * @return true if the operation succeeded and writers are blocked
+ */
+ public boolean tryBlockWriters(boolean canBlock) {
+ return acquireWriterLock(canBlock) ;
+ }
+ /** Allow writers.
+ * This must be used in conjunction with {@link #blockWriters()} or {@link #tryBlockWriters()}
+ *
+ * @see #blockWriters()
+ * @see #tryBlockWriters()
+ */
+ public void enableWriters() {
+ releaseWriterLock();
+ }
+
+ /** Execute an action in as if a Write but no write transaction started.
+ * This method can block.
+ * <p>
+ * Equivalent to:
+ * <pre>
+ * blockWriters() ;
+ * try { action.run(); }
+ * finally { enableWriters(); }
+ * </pre>
+ *
+ * @param action
+ */
+ public void execAsWriter(Runnable action) {
+ blockWriters() ;
+ try { action.run(); }
+ finally { enableWriters(); }
+ }
+
+ /** Start a transaction. This may block. */
+ public Transaction begin(ReadWrite readWrite) {
+ return begin(readWrite, true) ;
+ }
+
+ /**
+ * Start a transaction. Returns null if this operation would block.
+ * Readers can start at any time.
+ * A single writer policy is currently imposed so a "begin(WRITE)"
+ * may block.
+ */
+ public Transaction begin(ReadWrite readWrite, boolean canBlock) {
+ Objects.nonNull(readWrite) ;
+ checkActive() ;
+
+ // XXX Flag to bounce writers fpor long term "block writers"
+ if ( false /* bounceWritersAtTheMoment */) {
+ if ( readWrite == WRITE ) {
+ throw new TransactionException("Writers currently being rejected");
+ }
+ }
+
+ if ( canBlock )
+ exclusivitylock.readLock().lock() ;
+ else {
+ if ( ! exclusivitylock.readLock().tryLock() )
+ return null ;
+ }
+
+ // Readers never block.
+ if ( readWrite == WRITE ) {
+ // Writers take a WRITE permit from the semaphore to ensure there
+ // is at most one active writer, else the attempt to start the
+ // transaction blocks.
+ // Released by in notifyCommitFinish/notifyAbortFinish
+ boolean b = acquireWriterLock(canBlock) ;
+ if ( !b ) {
+ exclusivitylock.readLock().unlock() ;
+ return null ;
+ }
+ }
+ Transaction transaction = begin$(readWrite) ;
+ startActiveTransaction(transaction) ;
+ transaction.begin();
+ return transaction;
+ }
+
+ // The version is the serialization point for a transaction.
+ // All transactions on the same view of the data get the same serialization point.
+
+ // A read transaction can be promoted if writer does not start
+ // This TransactionCoordinator provides Serializable, Read-lock-free
+ // execution. With no item locking, a read can only be promoted
+ // if no writer started since the reader started.
+
+ /* The version of the data - incremented when transaction commits.
+ * This is the version with repest to the last commited transaction.
+ * Aborts do not cause the data version to advance.
+ * This counterr never goes backwards.
+ */
+ private final AtomicLong dataVersion = new AtomicLong(0) ;
+
+ private Transaction begin$(ReadWrite readWrite) {
+ synchronized(coordinatorLock) {
+ // Thread safe part of 'begin'
+ // Allocate the transaction serialization point.
+ TxnId txnId = txnIdGenerator.generate() ;
+ List<SysTrans> sysTransList = new ArrayList<>() ;
+ Transaction transaction = new Transaction(this, txnId, readWrite, dataVersion.get(), sysTransList) ;
+
+ ComponentGroup txnComponents = chooseComponents(this.components, readWrite) ;
+
+ try {
+ txnComponents.forEachComponent(elt -> {
+ SysTrans sysTrans = new SysTrans(elt, transaction, txnId) ;
+ sysTransList.add(sysTrans) ; }) ;
+ // Calling each component must be inside the lock
+ // so that a transaction does not commit overlapping with setup.
+ // If it did, different components might end up starting from
+ // different start states of the overall system.
+ txnComponents.forEachComponent(elt -> elt.begin(transaction)) ;
+ } catch(Throwable ex) {
+ // Careful about incomplete.
+ //abort() ;
+ //complete() ;
+ throw ex ;
+ }
+ return transaction ;
+ }
+ }
+
+ private ComponentGroup chooseComponents(ComponentGroup components, ReadWrite readWrite) {
+ if ( quorumGenerator == null )
+ return components ;
+ ComponentGroup cg = quorumGenerator.genQuorum(readWrite) ;
+ if ( cg == null )
+ return components ;
+ cg.forEach((id, c) -> {
+ TransactionalComponent tcx = components.findComponent(id) ;
+ if ( ! tcx.equals(c) )
+ log.warn("TransactionalComponent not in TransactionCoordinator's ComponentGroup") ;
+ }) ;
+ if ( log.isDebugEnabled() )
+ log.debug("Custom ComponentGroup for transaction "+readWrite+": size="+cg.size()+" of "+components.size()) ;
+ return cg ;
+ }
+
+ /** Is promotion of transactions enabled? */
+ /*private*/public/*for development*/ static boolean promotion = true ;
+
+ /** Control of whether a transaction promotion can see any commits that
+ * happened between this transaction starting and it promoting.
+ * A form of "ReadCommitted".
+ */
+ /*private*/public/*for development*/ static boolean readCommittedPromotion = false ;
+
+ /** Whether to wait for writers when trying to promote */
+ private static final boolean promotionWaitForWriters = true;
+
+ /** Attempt to promote a tranasaction from READ to WRITE.
+ * No-op for a transaction already a writer.
+ * Throws {@link TransactionException} if the promotion
+ * can not be done.
+ */
+ /*package*/ boolean promoteTxn(Transaction transaction) {
+ if ( ! promotion )
+ return false;
+
+ if ( transaction.getMode() == WRITE )
+ return true ;
+
+ // Has there been an writer active since the transaction started?
+ // Do a test outside the lock - only dataVaersion can change and that increases.
+ // If "read commited transactions" not allowed, the data has changed in a way we
+ // do no twish to expose.
+ // If this test fails outside the lock it will fail inside.
+ // If it passes, we have to test again in case there is an active writer.
+
+ if ( ! readCommittedPromotion ) {
+ long txnEpoch = transaction.getDataVersion() ; // The transaction-start point.
+ long currentEpoch = dataVersion.get() ; // The data serialization point.
+
+ if ( txnEpoch < currentEpoch )
+ // The data has changed and "read committed" not allowed.
+ // We can reject now.
+ return false ;
+ }
+
+ // Once we have acquireWriterLock, we are single writer.
+ // We may have to discard writer status because eocne we can make the defintite
+ // decision on promotion, we find we can't promote after all.
+ if ( readCommittedPromotion ) {
+ /*
+ * acquireWriterLock(true) ;
+ * synchronized(coordinatorLock) {
+ * begin$ ==>
+ * reset transaction.
+ * promote components
+ * reset dataVersion
+ */
+ acquireWriterLock(true) ;
+ synchronized(coordinatorLock) {
+ try {
+ transaction.promoteComponents() ;
+ // Because we want to see the new state of the data.s
+ //transaction.resetDataVersion(dataVersion.get());
+ } catch (TransactionException ex) {
+ try { transaction.abort(); } catch(RuntimeException ex2) {}
+ releaseWriterLock();
+ return false ;
+ }
+ }
+ return true;
+ }
+
+ if ( ! waitForWriters() )
+ // Failed to become a writer.
+ return false;
+ // Now a proto-writer.
+
+ synchronized(coordinatorLock) {
+ // Not read commited.
+ // Need to check the data version once we are the writer and all previous
+ // writers have commited or aborted.
+ // Has there been an writer active since the transaction started?
+ long txnEpoch = transaction.getDataVersion() ; // The transaction-start point.
+ long currentEpoch = dataVersion.get() ; // The data serialization point.
+
+ if ( txnEpoch != currentEpoch ) {
+ // Failed to promote.
+ releaseWriterLock();
+ return false ;
+ }
+
+ // ... we have now got the writer lock ...
+ try {
+ transaction.promoteComponents() ;
+ // No need to reset the data version because strict isolation.
+ } catch (TransactionException ex) {
+ try { transaction.abort(); } catch(RuntimeException ex2) {}
+ releaseWriterLock();
+ return false ;
+ }
+ }
+ return true ;
+ }
+
+ private boolean waitForWriters() {
+ if ( promotionWaitForWriters )
+ return acquireWriterLock(true) ;
+ else
+ return acquireWriterLock(false) ;
+ }
+
+ // Called once by Transaction after the action of commit()/abort() or end()
+ /** Signal that the transaction has finished. */
+ /*package*/ void completed(Transaction transaction) {
+ finishActiveTransaction(transaction);
+ journal.reset() ;
+ }
+
+ /*package*/ void executePrepare(Transaction transaction) {
+ // Do here because it needs access to the journal.
+ notifyPrepareStart(transaction);
+ transaction.getComponents().forEach(sysTrans -> {
+ TransactionalComponent c = sysTrans.getComponent() ;
+ ByteBuffer data = c.commitPrepare(transaction) ;
+ if ( data != null ) {
+ PrepareState s = new PrepareState(c.getComponentId(), data) ;
+ journal.write(s) ;
+ }
+ }) ;
+ notifyPrepareFinish(transaction);
+ }
+
+ /*package*/ void executeCommit(Transaction transaction, Runnable commit, Runnable finish) {
+ // This is the commit point.
+ synchronized(coordinatorLock) {
+ // *** COMMIT POINT
+ journal.sync() ;
+ // *** COMMIT POINT
+ // Now run the Transactions commit actions.
+ commit.run() ;
+ journal.truncate(0) ;
+ // and tell the Transaction it's finished.
+ finish.run() ;
+ // Bump global serialization point if necessary.
+ if ( transaction.getMode() == WRITE )
+ advanceDataVersion() ;
+ notifyCommitFinish(transaction) ;
+ }
+ }
+
+ // Inside the global transaction start/commit lock.
+ private void advanceDataVersion() {
+ dataVersion.incrementAndGet();
+ }
+
+ /*package*/ void executeAbort(Transaction transaction, Runnable abort) {
+ notifyAbortStart(transaction) ;
+ abort.run();
+ notifyAbortFinish(transaction) ;
+ }
+
+ // Active transactions: this is (the missing) ConcurrentHashSet
+ private final static Object dummy = new Object() ;
+ private ConcurrentHashMap<Transaction, Object> activeTransactions = new ConcurrentHashMap<>() ;
+ private AtomicLong activeTransactionCount = new AtomicLong(0) ;
+ private AtomicLong activeReadersCount = new AtomicLong(0) ;
+ private AtomicLong activeWritersCount = new AtomicLong(0) ;
+
+ private void startActiveTransaction(Transaction transaction) {
+ synchronized(coordinatorLock) {
+ // Use lock to ensure all the counters move together.
+ // Thread safe - we have not let the Transaction object out yet.
+ countBegin.incrementAndGet() ;
+ switch(transaction.getMode()) {
+ case READ: countBeginRead.incrementAndGet() ; activeReadersCount.incrementAndGet() ; break ;
+ case WRITE: countBeginWrite.incrementAndGet() ; activeWritersCount.incrementAndGet() ; break ;
+ }
+ activeTransactionCount.incrementAndGet() ;
+ activeTransactions.put(transaction, dummy) ;
+ }
+ }
+
+ private void finishActiveTransaction(Transaction transaction) {
+ synchronized(coordinatorLock) {
+ // Idempotent.
+ Object x = activeTransactions.remove(transaction) ;
+ if ( x == null )
+ return ;
+ countFinished.incrementAndGet() ;
+ activeTransactionCount.decrementAndGet() ;
+ switch(transaction.getMode()) {
+ case READ: activeReadersCount.decrementAndGet() ; break ;
+ case WRITE: activeWritersCount.decrementAndGet() ; break ;
+ }
+ }
+ exclusivitylock.readLock().unlock() ;
+ }
+
+ public long countActiveReaders() { return activeReadersCount.get() ; }
+ public long countActiveWriter() { return activeWritersCount.get() ; }
+ public long countActive() { return activeTransactionCount.get(); }
+
+ // notify*Start/Finish called round each transaction lifecycle step
+ // Called in cooperation between Transaction and TransactionCoordinator
+ // depending on who is actually do the work of each step.
+
+ /*package*/ void notifyPrepareStart(Transaction transaction) {}
+
+ /*package*/ void notifyPrepareFinish(Transaction transaction) {}
+
+ // Writers released here - can happen because of commit() or abort().
+
+ private void notifyCommitStart(Transaction transaction) {}
+
+ private void notifyCommitFinish(Transaction transaction) {
+ if ( transaction.getMode() == WRITE )
+ releaseWriterLock();
+ }
+
+ private void notifyAbortStart(Transaction transaction) { }
+
+ private void notifyAbortFinish(Transaction transaction) {
+ if ( transaction.getMode() == WRITE )
+ releaseWriterLock();
+ }
+
+ /*package*/ void notifyEndStart(Transaction transaction) { }
+
+ /*package*/ void notifyEndFinish(Transaction transaction) {}
+
+ // Called by Transaction once at the end of first commit()/abort() or end()
+
+ /*package*/ void notifyCompleteStart(Transaction transaction) { }
+
+ /*package*/ void notifyCompleteFinish(Transaction transaction) { }
+
+ // Coordinator state.
+ private final AtomicLong countBegin = new AtomicLong(0) ;
+
+ private final AtomicLong countBeginRead = new AtomicLong(0) ;
+
+ private final AtomicLong countBeginWrite = new AtomicLong(0) ;
+
+ private final AtomicLong countFinished = new AtomicLong(0) ;
+
+ // Access counters
+ public long countBegin() { return countBegin.get() ; }
+
+ public long countBeginRead() { return countBeginRead.get() ; }
+
+ public long countBeginWrite() { return countBeginWrite.get() ; }
+
+ public long countFinished() { return countFinished.get() ; }
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinatorState.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinatorState.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinatorState.java
new file mode 100644
index 0000000..5f0755f
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinatorState.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.util.HashMap ;
+import java.util.Map ;
+
+public class TransactionCoordinatorState {
+ /*package*/final Transaction transaction ;
+ /*package*/Map<ComponentId, SysTransState> componentStates = new HashMap<>();
+ /*package*/ TransactionCoordinatorState(Transaction transaction) {
+ this.transaction = transaction ;
+ }
+
+ public Transaction getTransaction() {
+ return transaction ;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionException.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionException.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionException.java
new file mode 100644
index 0000000..2891cf7
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import org.apache.jena.sparql.JenaTransactionException ;
+
+public class TransactionException extends JenaTransactionException {
+ public TransactionException() { super(); }
+ public TransactionException(String message) { super(message); }
+ public TransactionException(Throwable cause) { super(cause) ; }
+ public TransactionException(String message, Throwable cause) { super(message, cause) ; }
+
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionInfo.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionInfo.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionInfo.java
new file mode 100644
index 0000000..e0ab792
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionInfo.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import static org.apache.jena.dboe.transaction.txn.TxnState.INACTIVE;
+
+import org.apache.jena.query.ReadWrite ;
+
+/**
+ * A view that provides information about a transaction
+ * @see Transaction
+ */
+public interface TransactionInfo {
+
+ /** The transaction lifecycle state */
+ public TxnState getState() ;
+
+ /**
+ * Each transaction is allocated a serialization point by the transaction
+ * coordinator. Normally, this is related to this number and it increases
+ * over time as the data changes. Two readers can have the same
+ * serialization point - they are working with the same view of the data.
+ */
+ public long getDataVersion() ;
+
+ /** Has the transaction started? */
+ public boolean hasStarted() ;
+
+ /** Has the transaction finished (has commit/abort/end been called)? */
+ public boolean hasFinished() ;
+
+ /** Has the transaction gone through all lifecycle states? */
+ public boolean hasFinalised() ;
+
+ /** Get the trasnaction id for this transaction. Unique within this OS process (JVM) at least . */
+ public TxnId getTxnId() ;
+
+ /** What mode is this transaction?
+ * This may change from {@code READ} to {@code WRITE} in a transactions lifetime.
+ */
+ public ReadWrite getMode() ;
+
+ /** Is this a view of a READ transaction?
+ * Convenience operation equivalent to {@code (getMode() == READ)}
+ */
+ public default boolean isReadTxn() { return getMode() == ReadWrite.READ ; }
+
+ /** Is this a view of a WRITE transaction?
+ * Convenience operation equivalent to {@code (getMode() == WRITE)}
+ */
+ public default boolean isWriteTxn() { return getMode() == ReadWrite.WRITE ; }
+
+ /** Is this a view of a transaction that is active?
+ * Equivalent to {@code getState() != INACTIVE}
+ */
+ public default boolean isActiveTxn() {
+ return getState() != INACTIVE ;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java
new file mode 100644
index 0000000..30364d0
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.util.Objects ;
+
+import org.apache.jena.atlas.logging.Log ;
+import org.apache.jena.query.ReadWrite ;
+
+/**
+ * Framework for implementing a Transactional.
+ */
+
+public class TransactionalBase implements TransactionalSystem {
+ // Help debugging by generating names for Transactionals.
+ private final String label ;
+ protected boolean isShutdown = false ;
+ protected final TransactionCoordinator txnMgr ;
+
+ // Per thread transaction.
+ private final ThreadLocal<Transaction> theTxn = new ThreadLocal<>() ;
+
+ public TransactionalBase(String label, TransactionCoordinator txnMgr) {
+ this.label = label ;
+ this.txnMgr = txnMgr ;
+ }
+
+ public TransactionalBase(TransactionCoordinator txnMgr) {
+ this(null, txnMgr) ;
+ }
+
+ @Override
+ public TransactionCoordinator getTxnMgr() {
+ return txnMgr ;
+ }
+
+ // Development
+ private static final boolean trackAttachDetach = false ;
+
+ @Override
+ public TransactionCoordinatorState detach() {
+ if ( trackAttachDetach )
+ Log.info(this, ">> detach");
+ checkRunning() ;
+ // Not if it just commited but before end.
+ //checkActive() ;
+ Transaction txn = theTxn.get() ;
+ TransactionCoordinatorState coordinatorState = null ;
+ if ( txn != null )
+ // We are not ending.
+ coordinatorState = txnMgr.detach(txn) ;
+ if ( trackAttachDetach )
+ Log.info(this, " theTxn = "+txn) ;
+ theTxn.remove() ; ///??????
+ if ( trackAttachDetach )
+ Log.info(this, "<< detach");
+ if ( coordinatorState == null )
+ throw new TransactionException("Not attached") ;
+ return coordinatorState ;
+ }
+
+ @Override
+ public void attach(TransactionCoordinatorState coordinatorState) {
+ if ( trackAttachDetach )
+ Log.info(this, ">> attach");
+ Objects.nonNull(coordinatorState) ;
+ checkRunning() ;
+ checkNotActive() ;
+ TxnState txnState = coordinatorState.transaction.getState() ;
+ if ( txnState != TxnState.DETACHED )
+ throw new TransactionException("Not a detached transaction") ;
+ txnMgr.attach(coordinatorState) ;
+ if ( trackAttachDetach )
+ Log.info(this, " theTxn = "+coordinatorState.transaction) ;
+ theTxn.set(coordinatorState.transaction);
+ if ( trackAttachDetach )
+ Log.info(this, "<< attach");
+ }
+
+ @Override
+ public final void begin(ReadWrite readWrite) {
+ Objects.nonNull(readWrite) ;
+ checkRunning() ;
+ checkNotActive() ;
+ Transaction transaction = txnMgr.begin(readWrite) ;
+ theTxn.set(transaction) ;
+ }
+
+ @Override
+ public boolean promote() {
+ checkActive() ;
+ Transaction txn = getValidTransaction() ;
+ return txn.promote() ;
+ }
+
+ @Override
+ public final void commit() {
+ checkRunning() ;
+ TransactionalSystem.super.commit() ;
+ }
+
+ @Override
+ public void commitPrepare() {
+ Transaction txn = getValidTransaction() ;
+ txn.prepare() ;
+ }
+
+ @Override
+ public void commitExec() {
+ Transaction txn = getValidTransaction() ;
+ txn.commit() ;
+ _end() ;
+ }
+
+// /** Signal end of commit phase */
+// @Override
+// public void commitEnd() {
+// _end() ;
+// }
+
+ @Override
+ public final void abort() {
+ checkRunning() ;
+ checkActive() ;
+ Transaction txn = getValidTransaction() ;
+ try { txn.abort() ; }
+ finally { _end() ; }
+ }
+
+ @Override
+ public final void end() {
+ checkRunning() ;
+ // Don't check if active or if any thread locals exist
+ // because this may have already been called.
+ // txn.get() ; -- may be null -- test repeat calls.
+ _end() ;
+ }
+
+ /**
+ * Return the Read/write state (or null when not in a transaction)
+ */
+ @Override
+ final
+ public ReadWrite getState() {
+ checkRunning() ;
+ // tricky - touching theTxn causes it to initialize.
+ Transaction txn = theTxn.get() ;
+ if ( txn != null )
+ return txn.getMode() ;
+ theTxn.remove() ;
+ return null ;
+ }
+
+ @Override
+ final
+ public TransactionInfo getTransactionInfo() {
+ return getThreadTransaction() ;
+ }
+
+ @Override
+ final
+ public Transaction getThreadTransaction() {
+ Transaction txn = theTxn.get() ;
+ // Touched the thread local so it is defined now.
+// if ( txn == null )
+// theTxn.remove() ;
+ return txn ;
+ }
+
+ /** Get the transaction, checking there is one */
+ private Transaction getValidTransaction() {
+ Transaction txn = theTxn.get() ;
+ if ( txn == null )
+ throw new TransactionException("Not in a transaction") ;
+ return txn ;
+ }
+
+ private void checkRunning() {
+// if ( ! hasStarted )
+// throw new TransactionException("Not started") ;
+
+ if ( isShutdown )
+ throw new TransactionException("Shutdown") ;
+ }
+
+ /**
+ * Shutdown component, aborting any in-progress transactions. This operation
+ * is not guaranteed to be called.
+ */
+ public void shutdown() {
+ txnMgr.shutdown() ;
+ isShutdown = true ;
+ }
+
+ protected String label(String msg) {
+ if ( label == null )
+ return msg ;
+ return label+": "+msg ;
+ }
+
+ final
+ protected void checkActive() {
+ checkNotShutdown() ;
+ if ( ! isInTransaction() )
+ throw new TransactionException(label("Not in an active transaction")) ;
+ }
+
+ final
+ protected void checkNotActive() {
+ checkNotShutdown() ;
+ if ( isInTransaction() )
+ throw new TransactionException(label("Currently in an active transaction")) ;
+ }
+
+ final
+ protected void checkNotShutdown() {
+ if ( isShutdown )
+ throw new TransactionException(label("Already shutdown")) ;
+ }
+
+ private final void _end() {
+ Transaction txn = theTxn.get() ;
+ if ( txn != null ) {
+ try {
+ // Can throw an exception on begin(W)...end().
+ txn.end() ;
+ } finally {
+ theTxn.set(null) ;
+ theTxn.remove() ;
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponent.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponent.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponent.java
new file mode 100644
index 0000000..46253fe
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponent.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.nio.ByteBuffer ;
+
+/** Interface that for components of a transaction system.
+* <p><br/>
+* The {@link TransactionCoordinator} manages a number of components
+* which provide the {@link TransactionalComponent} interface.
+* <p><br/>
+* When a new coordinator starts, typically being when the in-process system starts,
+* there is a recovery phase when work from a previous coordinator is recovered.
+* Transactions were either were properly committed by the previous coordinator,
+* and hence redo actions (finalization) should be done,
+* or they were not, in which case undo actions may be needed.
+* Transctions to discard are not notified, only fully commited transaction are
+* notified during recovery. The component may need to keepit's own record of
+* undo actions needed across restarts.
+* <p><br/>
+* Lifecycle of startup:
+* <ul>
+* <li>{@link #startRecovery}
+* <li>{@link #recover} for each commited/durable transaction (redo actions)
+* <li>{@link #finishRecovery}, discarding any othe transactions (undo actions).
+* </ul>
+* <p><br/>
+* Lifecycle of a read transaction:
+* <ul>
+* <li>{@link #begin}
+* <li>{@link #complete}
+* </ul>
+* <br/>
+* A read transaction may also include {@code commit} or {@code abort} lifecycles.
+* {@link #commitPrepare} and {@link #commitEnd} are not called.
+*<p><br/>
+* Lifecycle of a write transaction:
+* <li>{@link #begin}
+* <li>{@link #commitPrepare}
+* <li>{@link #commit} or {@link #abort}
+* <li>{@link #commitEnd}
+* <li>{@link #complete} including abort
+* </ul>
+* <br/>
+* or if the application aborts the transaction:
+* <ul>
+* <li>{@link #begin}
+* <li>{@link #abort}
+* <li>{@link #complete}
+* </ul>
+* <p>
+* {@link #complete} may be called out of sequence and it forces an abort if before
+* {@link #commitPrepare}. Once {@link #commitPrepare} has been called, the component
+* can not decide whether to commit finally or to cause a system abort; it must wait
+* for the coordinator. After {@link #commitEnd}, the coordinator has definitely
+* commited the overall transaction and local prepared state can be released, and changes
+* made to the permanent state of the component.
+*
+* @see Transaction
+* @see TransactionCoordinator
+*/
+
+public interface TransactionalComponent
+{
+ /**
+ * Every component <i>instance</i> must supplied a unique number.
+ * It is used to route journal entries to subsystems, including across restarts/recovery.
+ * Uniqueness scope is within the same {@link TransactionCoordinator},
+ * and the same across restarts.
+ * <p>
+ * If a component imposes the rule of one-per-{@link TransactionCoordinator},
+ * the same number can be used (if different from all other component type instances).
+ * <p>
+ * If a component can have multiple instances per {@link TransactionCoordinator},
+ * for example indexes, each must have a unique instance id.
+ */
+ public ComponentId getComponentId() ;
+
+ // ---- Recovery phase
+ public void startRecovery() ;
+
+ /** Notification that {@code ref} was really committed and is being recovered.
+ *
+ * @param ref Same bytes as were written during prepare originally.
+ */
+ public void recover(ByteBuffer ref) ;
+
+ /** End of the receovery phase */
+ public void finishRecovery() ;
+
+ /** Indicate that no recovery is being done (the journal thinks everything was completed last time) */
+ public void cleanStart() ;
+
+ // ---- Normal operation
+
+ /** Start a transaction; return an identifier for this components use. */
+ public void begin(Transaction transaction) ;
+
+ /** Promote a component in a transaction.
+ * <p>
+ * May return "false" for "can't do that" if the transaction can not be promoted.
+ * <p>
+ * May throw {@link UnsupportedOperationException} if promotion is not supported.
+ */
+ public boolean promote(Transaction transaction) ;
+
+ /** Prepare for a commit.
+ * Returns some bytes that will be written to the journal.
+ * The journal remains valid until {@link #commitEnd} is called.
+ */
+ public ByteBuffer commitPrepare(Transaction transaction) ;
+
+ /** Commit a transaction (make durable).
+ * Other components not have been commited yet and recovery may occur still.
+ * Permanent state should not be finalised until {@link #commitEnd}.
+ */
+ public void commit(Transaction transaction) ;
+
+ /** Signal all commits on all components are done (the component can clearup now) */
+ public void commitEnd(Transaction transaction) ;
+
+ /** Abort a transaction (undo the effect of a transaction) */
+ public void abort(Transaction transaction) ;
+
+ /** Finalization - the coordinator will not mention the transaction again
+ * although recovery after a crash may do so.
+ */
+ public void complete(Transaction transaction) ;
+
+ // ---- End of operations
+
+ /** Detach this component from the transaction of the current thread
+ * and return some internal state that can be used in a future call of
+ * {@link #attach(SysTransState)}
+ * <p>
+ * After this call, the component is not in a transaction but the
+ * existing transaction still exists. The thread may start a new
+ * transaction; that transaction is completely independent of the
+ * detached transaction.
+ * <p>
+ * Returns {@code null} if the current thread not in a transaction.
+ * The component may return null to indicate it has no state.
+ * The return system state should be used in a call to {@link #attach(SysTransState)}
+ * and the transaction ended in the usual way.
+ *
+ */
+ public SysTransState detach() ;
+
+ /** Set the current thread to be in the transaction. The {@code systemState}
+ * must be obtained from a call of {@link #detach()}.
+ * This method can only be called once per {@code systemState}.
+ */
+ public void attach(SysTransState systemState) ;
+
+ /** Shutdown component, aborting any in-progress transactions.
+ * This operation is not guaranteed to be called.
+ */
+ public void shutdown() ;
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentBase.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentBase.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentBase.java
new file mode 100644
index 0000000..75f1b1b
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentBase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.nio.ByteBuffer ;
+
+import org.apache.jena.query.ReadWrite ;
+
+/**
+ * A transaction component that does nothing - can be used as a helper for
+ * management tasks hooked into the transaction component lifecycle but which
+ * are not stateful across restarts.
+ */
+public class TransactionalComponentBase<X> extends TransactionalComponentLifecycle<X> {
+
+ public TransactionalComponentBase(ComponentId id) {
+ super(id) ;
+ }
+
+ @Override
+ public void startRecovery() {}
+
+ @Override
+ public void recover(ByteBuffer ref) {}
+
+ @Override
+ public void finishRecovery() {}
+
+ @Override
+ public void cleanStart() {}
+
+ @Override
+ protected X _begin(ReadWrite readWrite, TxnId txnId) {
+ return null ;
+ }
+
+ @Override
+ protected ByteBuffer _commitPrepare(TxnId txnId, X state) {
+ return null ;
+ }
+
+ @Override
+ protected void _commit(TxnId txnId, X state) {}
+
+ @Override
+ protected void _commitEnd(TxnId txnId, X state) {}
+
+ @Override
+ protected void _abort(TxnId txnId, X state) {}
+
+ @Override
+ protected void _complete(TxnId txnId, X state) {}
+
+ @Override
+ protected void _shutdown() {}
+
+ @Override
+ protected X _promote(TxnId txnId, X state) { return null; }
+
+}
+