You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2017/10/03 19:34:12 UTC
[26/65] [abbrv] jena git commit: JENA-1397: Rename java packages
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentLifecycle.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentLifecycle.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentLifecycle.java
new file mode 100644
index 0000000..3374b14
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentLifecycle.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import static org.apache.jena.dboe.transaction.txn.TxnState.*;
+
+import java.nio.ByteBuffer ;
+import java.util.Objects ;
+
+import org.apache.jena.query.ReadWrite ;
+
+import org.apache.jena.atlas.lib.InternalErrorException ;
+
+/**
+ * Base implementation of the component interface for {@link TransactionalComponent}.
+ */
+public abstract class TransactionalComponentLifecycle<X> implements TransactionalComponent {
+
+ // Pass down recovery operations.
+ // This class has no transaction-recorded state.
+ @Override public abstract void startRecovery();
+ @Override public abstract void recover(ByteBuffer ref);
+ @Override public abstract void finishRecovery();
+
+ // ---- Normal operation
+
+ private static final boolean CHECKING = false ;
+ private ThreadLocal<TxnState> trackTxn = CHECKING ? ThreadLocal.withInitial(() -> INACTIVE) : null ;
+
+ // Access to these two must be via the getter/setters below only.
+ // Allows stuff for thread switching.
+ private ThreadLocal<Transaction> threadTxn = new ThreadLocal<>() ;
+ private ThreadLocal<X> componentState = new ThreadLocal<>() ;
+ private final ComponentId componentId ;
+
+ protected TransactionalComponentLifecycle(ComponentId componentId) {
+ this.componentId = componentId ;
+ }
+
+ @Override
+ public ComponentId getComponentId() {
+ return componentId ;
+ }
+
+// // Very dangerous!
+// protected void setForThread(Transaction txn, X state) {
+// threadTxn.set(txn);
+// componentState.set(state);
+// }
+
+ /** Start a transaction */
+ @Override
+ final
+ public void begin(Transaction transaction) {
+ Objects.requireNonNull(transaction) ;
+ setTransaction(transaction);
+ checkState(INACTIVE, COMMITTED, ABORTED) ;
+ setTrackTxn(ACTIVE);
+ X x = _begin(transaction.getMode(), transaction.getTxnId()) ;
+ setDataState(x);
+ }
+
+ /** Promote a component in a transaction */
+ @Override
+ final
+ public boolean promote(Transaction transaction) {
+ Objects.requireNonNull(transaction) ;
+ checkState(ACTIVE) ;
+ X newState = _promote(transaction.getTxnId(), getDataState());
+ if ( newState == null )
+ return false;
+ setDataState(newState);
+ return true;
+ }
+
+ /** Commit a transaction (make durable): prepare - commit - cleanup */
+ @Override
+ final
+ public ByteBuffer commitPrepare(Transaction transaction) {
+ checkAligned(transaction) ;
+ checkState(ACTIVE) ;
+ try { return _commitPrepare(transaction.getTxnId(), getDataState()) ; }
+ finally { setTrackTxn(PREPARE) ; }
+ }
+
+ @Override
+ final
+ public void commit(Transaction transaction) {
+ checkAligned(transaction) ;
+ checkState(PREPARE) ;
+ _commit(transaction.getTxnId(), getDataState());
+ setTrackTxn(COMMIT) ;
+ }
+
+ @Override
+ final
+ public void commitEnd(Transaction transaction) {
+ checkAligned(transaction) ;
+ checkState(COMMIT) ;
+ _commitEnd(transaction.getTxnId(), getDataState());
+ setTrackTxn(COMMITTED) ;
+ internalComplete(transaction) ;
+ }
+
+ @Override
+ final
+ public void abort(Transaction transaction) {
+ checkAligned(transaction) ;
+ checkState(ACTIVE, PREPARE, COMMIT) ;
+ _abort(transaction.getTxnId(), getDataState()) ;
+ setTrackTxn(ABORTED) ;
+ internalComplete(transaction) ;
+ }
+
+ private void internalComplete(Transaction transaction) {
+ _complete(transaction.getTxnId(), getDataState());
+ setTrackTxn(INACTIVE) ;
+ releaseThreadState() ;
+ }
+
+ @Override
+ final
+ public void complete(Transaction transaction) {
+ if ( transaction.hasFinished() )
+ return ;
+ checkAligned(transaction) ;
+ ReadWrite m = getReadWriteMode() ;
+ switch(m) {
+ case READ:
+ checkState(ACTIVE, COMMITTED, ABORTED) ;
+ break ;
+ case WRITE:
+ // If bad, force abort?
+ checkState(COMMITTED, ABORTED) ;
+ break ;
+ }
+ _complete(transaction.getTxnId(), getDataState());
+ switch(m) {
+ case READ:
+ internalComplete(transaction);
+ break ;
+ case WRITE:
+ // complete happened in the commit or abort.
+ break ;
+ }
+ }
+
+ @Override
+ final
+ public void shutdown() {
+ _shutdown() ;
+ clearInternal() ;
+ }
+
+ @Override
+ final public SysTransState detach() {
+ TxnState txnState = getTxnState() ;
+ if ( txnState == null )
+ return null ;
+ checkState(ACTIVE) ;
+ setTrackTxn(DETACHED) ;
+ SysTransState transState = new SysTransState(this, getTransaction(), getDataState()) ;
+ //****** Thread locals
+ releaseThreadState() ;
+ return transState ;
+ }
+
+ @Override
+ public void attach(SysTransState state) {
+ @SuppressWarnings("unchecked")
+ X x = (X)state.getState() ;
+// // reset to not thread not in
+// if ( CHECKING )
+// trackTxn : ThreadLocal<TxnState>
+//
+//
+ setTransaction(state.getTransaction());
+ setDataState(x);
+ setTrackTxn(ACTIVE) ;
+ }
+
+ // -- Access object members.
+
+ public static class ComponentState<X> {
+ final TxnState state;
+ final Transaction txn ;
+ final X componentState ;
+ ComponentState(TxnState state, Transaction txn, X componentState) {
+ super() ;
+ this.state = state ;
+ this.txn = txn ;
+ this.componentState = componentState ;
+ }
+ }
+
+ public ComponentState<X> getComponentState() {
+ return new ComponentState<>(getTrackTxn(), getTransaction(), getDataState()) ;
+ }
+
+ public void setComponentState(ComponentState<X> state) {
+ setTrackTxn(state.state);
+ setTransaction(state.txn);
+ setDataState(state.componentState);
+
+ }
+
+ protected void releaseThreadState() {
+ // Remove thread locals
+ if ( trackTxn != null )
+ trackTxn.remove() ;
+ componentState.remove();
+
+// java version "1.8.0_31"
+// Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
+// Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode)
+//
+// openjdk version "1.8.0_40-internal"
+// OpenJDK Runtime Environment (build 1.8.0_40-internal-b09)
+// OpenJDK 64-Bit Server VM (build 25.40-b13, mixed mode)
+
+ // This one is very important else the memory usage grows. Not clear why.
+ // A Transaction has an internal AtomicReference. Replacing the AtomicReference
+ // with a plain member variable slows the growth down greatly.
+ threadTxn.remove() ;
+ }
+
+ protected void clearInternal() {
+ trackTxn = null ;
+ threadTxn = null ;
+ componentState = null ;
+ }
+
+ //protected ComponentState<X> getState()
+
+ protected X getDataState() { return componentState.get() ; }
+ protected void setDataState(X data) { componentState.set(data) ; }
+
+ protected Transaction getTransaction() { return threadTxn.get(); }
+ protected void setTransaction(Transaction txn) { threadTxn.set(txn); }
+
+ private void setTrackTxn(TxnState newState) {
+ if ( ! CHECKING ) return ;
+ trackTxn.set(newState);
+ }
+
+ // This is our record of the state - it is not necessarily the transactions
+ // view during changes. This class tracks the expected incomign state and
+ // the transaction
+ private TxnState getTrackTxn() {
+ if ( ! CHECKING ) return null ;
+ return trackTxn.get();
+ }
+ // -- Access object members.
+
+ // XXX Align to javadoc in TransactionalComponent.
+
+ /* There are two lifecycles, one for write transaction, one
+ * for read transactions. This affects how transaction end so
+ * when/if promoted read->write transactions happen, a promoted
+ * transaction will follow the write lifecycle.
+ *
+ * In both lifecyles, the implementer can assume that calls
+ * happen at the right points and called only as needed. Framework
+ * takes care of checking.
+ *
+ * Read lifecycle:
+ * A read transaction be be just begin(READ)-end() but may also
+ * have commit or abort before end. The _commitRead and _abortRead
+ * calls note if an explicit commit or abort occurs but may not be
+ * called. _endRead is always called exactly once.
+ *
+ * _commitRead
+ * _abortRead
+ * _endRead
+ * _complete
+ *
+ * Write lifecycle:
+ * A write transaction must have a commit() or abort() before end().
+ * The fraemwork will check this.
+ *
+ * If the transaction commits:
+ * _commitPrepareWrite
+ * _commitWrite -- The transaction is
+ * _commitEndWrite
+ *
+ * If the transaction aborts:
+ * _abortWrite
+ *
+ * After any lifecycle, a final call of
+ * _complete()
+ *
+ * indicates ths transaction has fully finished.
+ *
+ * Typically, an implementation does not need to take action in every call.
+ */
+
+// /**
+// *
+// * @param readWrite
+// * @param txnId
+// * @return
+// */
+// protected abstract X _begin(ReadWrite readWrite, TxnId txnId) ;
+//
+// /**
+// *
+// * @param txnId
+// * @param state
+// * @return
+// */
+// protected abstract ByteBuffer _commitPrepareWrite(TxnId txnId, X state) ;
+//
+// /**
+// *
+// * @param txnId
+// * @param state
+// */
+// protected abstract void _commitWrite(TxnId txnId, X state) ;
+//
+// /**
+// *
+// * @param txnId
+// * @param state
+// */
+// protected abstract void _commitEndWrite(TxnId txnId, X state) ;
+//
+// /**
+// *
+// * @param txnId
+// * @param state
+// */
+// protected abstract void _abortWrite(TxnId txnId, X state) ;
+//
+// /**
+// *
+// * @param txnId
+// * @param state
+// * @return
+// */
+// protected abstract ByteBuffer _commitRead(TxnId txnId, X state) ;
+//
+// /**
+// *
+// * @param txnId
+// * @param state
+// * @return
+// */
+// protected abstract ByteBuffer _abortRead(TxnId txnId, X state) ;
+//
+// /**
+// *
+// * @param txnId
+// * @param state
+// */
+// protected abstract void _complete(TxnId txnId, X state) ;
+//
+// /**
+// *
+// */
+// protected abstract void _shutdown() ;
+
+ protected abstract X _begin(ReadWrite readWrite, TxnId txnId) ;
+ protected abstract X _promote(TxnId txnId, X oldState) ;
+ protected abstract ByteBuffer _commitPrepare(TxnId txnId, X state) ;
+ protected abstract void _commit(TxnId txnId, X state) ;
+ protected abstract void _commitEnd(TxnId txnId, X state) ;
+ protected abstract void _abort(TxnId txnId, X state) ;
+ protected abstract void _complete(TxnId txnId, X state) ;
+ protected abstract void _shutdown() ;
+
+ protected ReadWrite getReadWriteMode() {
+ Transaction txn = getTransaction() ;
+ return txn.getMode() ;
+ }
+
+ protected boolean isActiveTxn() {
+ TxnState txnState = getTxnState() ;
+ if ( txnState == null )
+ return false ;
+ switch(getTxnState()) {
+ case INACTIVE: case END_ABORTED: case END_COMMITTED:
+ return false ;
+ case ACTIVE: case DETACHED: case PREPARE: case ABORTED: case COMMIT: case COMMITTED:
+ return true ;
+ //null: default: return false ;
+ // Get the compiler to check all states covered.
+ }
+ // Should not happen.
+ throw new InternalErrorException("Unclear transaction state") ;
+ }
+
+ protected boolean isReadTxn() { return ! isWriteTxn() ; }
+
+ protected boolean isWriteTxn() {
+ Transaction txn = getTransaction();
+ return txn.isWriteTxn() ;
+ }
+
+ protected void checkTxn() {
+ if ( ! isActiveTxn() )
+ throw new TransactionException("Not in a transaction") ;
+ }
+
+// protected void requireWriteTxn() {
+// Transaction txn = getTransaction();
+// if ( txn == null )
+// throw new TransactionException("Not a transaction");
+// else
+// txn.requireWriteTxn() ;
+// }
+
+ protected void checkWriteTxn() {
+ Transaction txn = getTransaction();
+ if ( txn == null )
+ throw new TransactionException("Not a transaction");
+ else
+ txn.requireWriteTxn() ;
+ }
+
+ // -- Access object members.
+
+ private TxnState getTxnState() {
+ Transaction txn = getTransaction() ;
+ if ( txn == null )
+ return null ;
+ return txn.getState() ;
+ }
+
+ private void checkAligned(Transaction transaction) {
+ if ( ! CHECKING ) return ;
+ Transaction txn = getTransaction();
+ if ( txn != transaction )
+ throw new TransactionException("Transaction is not the transaction of the thread") ;
+ }
+
+ private void checkState(TxnState expected) {
+ if ( ! CHECKING ) return ;
+ TxnState s = getTrackTxn() ;
+ if ( s != expected )
+ throw new TransactionException("Transaction is in state "+s+": expected state "+expected) ;
+ }
+
+ private void checkState(TxnState expected1, TxnState expected2) {
+ if ( ! CHECKING ) return ;
+ TxnState s = getTrackTxn() ;
+ if ( s != expected1 && s != expected2 )
+ throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+" or "+expected2) ;
+ }
+
+ // Avoid varargs ... undue worry?
+ private void checkState(TxnState expected1, TxnState expected2, TxnState expected3) {
+ if ( ! CHECKING ) return ;
+ TxnState s = getTrackTxn() ;
+ if ( s != expected1 && s != expected2 && s != expected3 )
+ throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+", "+expected2+" or "+expected3) ;
+ }
+
+ // private void checkStateNot(State unexpected) {
+// State s = state.get();
+// if ( s == unexpected )
+// throw new TransactionException("Transaction in unexpected state "+s) ;
+// }
+
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentWrapper.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentWrapper.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentWrapper.java
new file mode 100644
index 0000000..14ddf20
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalComponentWrapper.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn ;
+
+import java.nio.ByteBuffer ;
+
+public class TransactionalComponentWrapper implements TransactionalComponent {
+ protected final TransactionalComponent other ;
+
+ public TransactionalComponentWrapper(TransactionalComponent other) {
+ this.other = other ;
+ }
+
+ @Override
+ public void startRecovery() {
+ other.startRecovery() ;
+ }
+
+ @Override
+ public void recover(ByteBuffer ref) {
+ other.recover(ref) ;
+ }
+
+ @Override
+ public void finishRecovery() {
+ other.finishRecovery() ;
+ }
+
+ @Override
+ public void cleanStart() {
+ other.cleanStart() ;
+ }
+
+ @Override
+ public ComponentId getComponentId() {
+ return other.getComponentId() ;
+ }
+
+ @Override
+ public void begin(Transaction transaction) {
+ other.begin(transaction) ;
+ }
+
+ @Override
+ public boolean promote(Transaction transaction) {
+ return other.promote(transaction) ;
+ }
+
+ @Override
+ public ByteBuffer commitPrepare(Transaction transaction) {
+ return other.commitPrepare(transaction) ;
+ }
+
+ @Override
+ public void commit(Transaction transaction) {
+ other.commit(transaction) ;
+ }
+
+ @Override
+ public void commitEnd(Transaction transaction) {
+ other.commitEnd(transaction) ;
+ }
+
+ @Override
+ public void abort(Transaction transaction) {
+ other.abort(transaction) ;
+ }
+
+ @Override
+ public void complete(Transaction transaction) {
+ other.complete(transaction) ;
+ }
+
+ @Override
+ public SysTransState detach() {
+ return other.detach() ;
+ }
+
+ @Override
+ public void attach(SysTransState systemState) {
+ other.attach(systemState) ;
+ }
+
+ @Override
+ public void shutdown() {
+ other.shutdown() ;
+ }
+
+ @Override
+ public String toString() { return "W:"+other.toString() ; }
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalMRSW.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalMRSW.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalMRSW.java
new file mode 100644
index 0000000..0aa741a
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalMRSW.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.nio.ByteBuffer ;
+import java.util.concurrent.locks.Lock ;
+import java.util.concurrent.locks.ReadWriteLock ;
+import java.util.concurrent.locks.ReentrantReadWriteLock ;
+
+import org.apache.jena.atlas.logging.Log ;
+
+import org.apache.jena.query.ReadWrite ;
+
+/** Implementation of the component interface for {@link TransactionalComponent}.
+ * Useful for in-memory transactions that do not provide durability or abort (undo).
+ * When retro fitting to other systems, that may be the best that can be done.
+ */
+public class TransactionalMRSW extends TransactionalComponentLifecycle<Object> {
+ // MRSW implementation of TransactionMVCC
+ // XXX Update to Jena style TransactionalLock
+ private ReadWriteLock lock = new ReentrantReadWriteLock() ;
+
+ public TransactionalMRSW(ComponentId componentId) {
+ super(componentId) ;
+ }
+
+ // ---- Recovery phase
+ @Override
+ public void startRecovery() {}
+
+ @Override
+ public void recover(ByteBuffer ref) {
+ Log.warn(this, "Called to recover a transaction (ignored)") ;
+ }
+
+ @Override
+ public void finishRecovery() { }
+
+ @Override
+ public void cleanStart() {}
+
+ private Lock getLock() {
+ return ( ReadWrite.WRITE.equals(getReadWriteMode()) ) ? lock.writeLock() : lock.readLock() ;
+ }
+
+ @Override
+ protected Object _begin(ReadWrite readWrite, TxnId thisTxnId) {
+ Lock lock = getLock() ;
+ // This is the point that makes this MRSW (readers OR writer), not MR+SW (readers and a writer)
+ lock.lock();
+ if ( isWriteTxn() )
+ startWriteTxn();
+ else
+ startReadTxn();
+ return createState();
+ }
+
+ private Object createState() {
+ return new Object();
+ }
+
+ @Override
+ protected Object _promote(TxnId txnId, Object state) {
+ // We have a read lock, the transaction coordinator has said
+ // it's OK (from it's point-of-view) to promote so this should succeed.
+ // We have a read lock - theer are no other writers.
+ boolean b = lock.writeLock().tryLock();
+ if ( ! b ) {
+ Log.warn(this, "Failed to promote");
+ return false;
+ }
+ lock.readLock().unlock();
+ return createState();
+ }
+
+ // Checks.
+
+ protected void startReadTxn() {}
+ protected void startWriteTxn() {}
+ protected void finishReadTxn() {}
+ protected void finishWriteTxn() {}
+
+ @Override
+ protected ByteBuffer _commitPrepare(TxnId txnId, Object obj) {
+ return null ;
+ }
+
+ @Override
+ protected void _commit(TxnId txnId, Object obj) {
+ clearup() ;
+ }
+
+ @Override
+ protected void _commitEnd(TxnId txnId, Object obj) {
+ clearup() ;
+ }
+
+ @Override
+ protected void _abort(TxnId txnId, Object obj) {
+ clearup() ;
+ }
+
+ @Override
+ protected void _complete(TxnId txnId, Object obj) {
+ }
+
+ @Override
+ protected void _shutdown() {
+ lock = null ;
+ }
+
+ private void clearup() {
+ Lock lock = getLock() ;
+ if ( isWriteTxn() )
+ finishWriteTxn();
+ else
+ finishReadTxn();
+ lock.unlock();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalSystem.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalSystem.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalSystem.java
new file mode 100644
index 0000000..cca13bb
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalSystem.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import org.apache.jena.dboe.transaction.Transactional;
+import org.apache.jena.query.ReadWrite ;
+
+/** Implementation side of a {@link Transactional}.
+ * {@link Transactional} presents the application facing view
+ * whereas this has all the possible steps of an implementation.
+ * Normally, the implementation of {@link #commit} is split up.
+ */
+public interface TransactionalSystem extends Transactional {
+
+ @Override
+ public default void commit() {
+ commitPrepare() ;
+ commitExec() ;
+ }
+
+ /** Do the 2-phase "prepare" step after which
+ * the transaction coordinator decides whether to commit
+ * or abort. A TransactionalSystem must be prepared for
+ * both possibilities.
+ */
+ public void commitPrepare();
+
+ /** Do the 2-phase "commit" step */
+ public void commitExec();
+
+ /** Suspend this transaction, detaching from the current thread.
+ * A new transaction on this thread can performed but the detached
+ * transaction still exists and if it is a write transaction
+ * it can still block other write transactions.
+ */
+ public TransactionCoordinatorState detach() ;
+
+ /**
+ * Attach a transaction to this thread.
+ * A transaction system implementation usually imposes a rule that
+ * only one thread can have a transaction attached at a time.
+ */
+ public void attach(TransactionCoordinatorState coordinatorState) ;
+
+ /** Get the associated {@link TransactionCoordinator} */
+ public TransactionCoordinator getTxnMgr() ;
+
+ /** Return the Read/Write state from the point of view of the caller.
+ * Return null when not in a transaction.
+ */
+ public ReadWrite getState() ;
+
+ @Override
+ public default boolean isInTransaction() { return getState() != null ; }
+
+ /** Return an information view of the transaction for this thread, if any.
+ * Returns null when there is no active transaction for this tread.
+ */
+ public TransactionInfo getTransactionInfo() ;
+
+ /** Return the transaction object for this thread.
+ * Low-level use only.
+ * To get information about the current transaction, call {@link #getTransactionInfo}.
+ */
+ public Transaction getThreadTransaction() ;
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnId.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnId.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnId.java
new file mode 100644
index 0000000..166b487
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnId.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+
+/**
+ * {@code TxnId} is a identfier for a transaction.
+ * A component in a transaction can use it as a unique key.
+ * The {@code TxnId}
+ * <ul>
+ * <li>must be unique across a JVM run
+ * <li>unique across JVm runs if used as a persistent name
+ * <li>Must provide value equality semantics (two {@code TxnId} are {@code .equals}
+ * if
+ * </ul>
+ * <p>
+ * It is preferrable that the TxnId is global unique over time and space.
+ */
+public interface TxnId {
+// public static TxnId create() { return TxnIdSimple.create() ; }
+//
+// public static TxnId create(byte[] bytes) {
+// switch(bytes.length) {
+// case 8: return TxnIdSimple.create(bytes) ;
+// case 16: return TxnIdUuid.create(bytes) ;
+// default:
+// throw new TransactionException("TxnId bytes unrecognized: length="+bytes.length) ;
+// }
+// }
+//
+ // Reminder to implement.
+ @Override
+ public int hashCode() ;
+ @Override
+ public boolean equals(Object other) ;
+
+ public String name() ;
+ public byte[] bytes() ;
+ /** A long that is a subset, or all or, the bytes.
+ * This should be unique for the lifetime of the transaction and
+ * ideally unique per system instance. It is not a persistent record
+ * of a transaction, it is for a transaction identifier in running code.
+ * ("system" maybe larger than on e JVM).
+ */
+ public long runtime() ;
+
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdFactory.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdFactory.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdFactory.java
new file mode 100644
index 0000000..a93d5b5
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import org.apache.commons.lang3.NotImplementedException ;
+
+/** Factory for some forms of {@link TxnId}.
+ * This is ony some possible {@link TxnIdGenerator}
+ *
+ * @see TxnId
+ */
+public class TxnIdFactory {
+ /** Generator for {@link TxnId}s for the counter based implemention. */
+ public static final TxnIdGenerator txnIdGenSimple = ()->TxnIdSimple.create() ;
+ /** Generator for {@link TxnId}s for the UUID based implemention. */
+ public static final TxnIdGenerator txnIdGenUuid = ()->TxnIdUuid.create() ;
+
+ /** Return the default, good enough for one JVM
+ * (usually the simple counter based implemention)
+ */
+ public static TxnId create() {
+ return createSimple() ;
+ }
+
+ /** Return a TxnId from the counter based implemention. */
+ public static TxnId createSimple() {
+ return txnIdGenSimple.generate() ;
+ }
+
+ /** Return a TxnId from the UUID based implemention. */
+ public static TxnId createUuid() {
+ return txnIdGenUuid.generate() ;
+ }
+
+ public static TxnId create(byte[] bytes) {
+ switch(bytes.length) {
+ case 8 : return TxnIdSimple.create(bytes) ;
+ case 16 :return TxnIdUuid.create(bytes) ;
+ default:
+ throw new NotImplementedException("Unrcognized bytes length: "+bytes.length) ;
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdGenerator.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdGenerator.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdGenerator.java
new file mode 100644
index 0000000..ab4e5ba
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdGenerator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+
+/**
+ * Generator of {@link TxnId}s.
+ * {@code TxnId} is a identfier for a transaction.
+ * A component in a transaction can use it as a unique key.
+ * The {@code TxnId}
+ * <ul>
+ * <li>must be unique across a JVM run
+ * <li>unique across JVm runs if used as a persistent name
+ * <li>Must provide value equality semantics (two {@code TxnId} are {@code .equals}
+ * if
+ * </ul>
+ * <p>
+ * It is preferrable that the TxnId is global unique over time and space.
+ */
+public interface TxnIdGenerator {
+ public TxnId generate() ;
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdSimple.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdSimple.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdSimple.java
new file mode 100644
index 0000000..d167cf7
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdSimple.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.util.concurrent.atomic.AtomicLong ;
+
+import org.apache.jena.atlas.lib.Bytes ;
+
+/**
+ * Simple TxnId, mainly for debugging.
+ */
+public class TxnIdSimple implements TxnId {
+ private static AtomicLong counter = new AtomicLong(0) ;
+
+ static TxnIdSimple create() {
+ return new TxnIdSimple(counter.incrementAndGet()) ;
+ }
+
+ public static TxnIdSimple create(byte[] bytes) {
+ return new TxnIdSimple(Bytes.getLong(bytes)) ;
+ }
+
+ private final long x ;
+
+ public TxnIdSimple(long x) {
+ this.x = x ;
+ }
+
+ @Override
+ public String name() {
+ return String.format("0x%04X",x) ;
+ }
+
+ @Override
+ public byte[] bytes() {
+ return Bytes.packLong(x) ;
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.hashCode(x) ;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if ( this == obj )
+ return true ;
+ if ( obj == null )
+ return false ;
+ if ( getClass() != obj.getClass() )
+ return false ;
+ TxnIdSimple other = (TxnIdSimple)obj ;
+ if ( x != other.x )
+ return false ;
+ return true ;
+ }
+
+ @Override
+ public String toString() {
+ return "txn:"+x ;
+ }
+
+ @Override
+ public long runtime() {
+ return x ;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdUuid.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdUuid.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdUuid.java
new file mode 100644
index 0000000..7e655f2
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnIdUuid.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+import java.util.UUID ;
+
+import org.apache.jena.atlas.lib.Bytes ;
+import org.apache.jena.dboe.migrate.L;
+import org.apache.jena.shared.uuid.JenaUUID ;
+
+/** {@link TxnId} based on a {@link UUID}.
+ */
+public class TxnIdUuid implements TxnId {
+
+ static TxnIdUuid create() {
+ UUID id = JenaUUID.generate().asUUID() ; // UUID.randomUUID() ;
+ return new TxnIdUuid(id) ;
+ }
+
+ public static TxnIdUuid create(byte[] bytes) {
+ long mostSignificantBits = Bytes.getLong(bytes, 0) ;
+ long leastSignificantBits = Bytes.getLong(bytes, 8) ;
+ return new TxnIdUuid(mostSignificantBits, leastSignificantBits) ;
+ }
+
+ private long mostSignificantBits ;
+ private long leastSignificantBits ;
+ private byte[] bytes = null ;
+ private String name = null ;
+
+ /*package*/ TxnIdUuid(UUID id) {
+ mostSignificantBits = id.getMostSignificantBits() ;
+ leastSignificantBits = id.getLeastSignificantBits() ;
+ }
+
+ /*package*/ TxnIdUuid(long mostSig, long leastSig) {
+ mostSignificantBits = mostSig ;
+ leastSignificantBits = leastSig ;
+ }
+
+ @Override
+ public String name() {
+ if ( name == null )
+ name = L.uuidToString(mostSignificantBits, leastSignificantBits) ;
+ return name ;
+ }
+
+ @Override
+ public byte[] bytes() {
+ if ( bytes == null )
+ bytes = L.uuidAsBytes(mostSignificantBits, leastSignificantBits) ;
+ return bytes ;
+ }
+
+ @Override
+ public long runtime() {
+ // In type 1, the mostSignificantBits have the timestamp in it.
+ return mostSignificantBits ;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31 ;
+ int result = 1 ;
+ result = prime * result + (int)(leastSignificantBits ^ (leastSignificantBits >>> 32)) ;
+ result = prime * result + (int)(mostSignificantBits ^ (mostSignificantBits >>> 32)) ;
+ return result ;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if ( this == obj )
+ return true ;
+ if ( obj == null )
+ return false ;
+ if ( getClass() != obj.getClass() )
+ return false ;
+ TxnIdUuid other = (TxnIdUuid)obj ;
+ if ( leastSignificantBits != other.leastSignificantBits )
+ return false ;
+ if ( mostSignificantBits != other.mostSignificantBits )
+ return false ;
+ return true ;
+ }
+
+ @Override
+ public String toString() {
+ //return name() ;
+ return String.format("[%04X]", mostSignificantBits&0xFFFF) ;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnState.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnState.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnState.java
new file mode 100644
index 0000000..c2eb741
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TxnState.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
+/** The states of the transaction lifecycle */
+public enum TxnState { INACTIVE, ACTIVE, DETACHED, PREPARE, COMMIT, COMMITTED, ABORTED, END_COMMITTED, END_ABORTED }
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/Journal.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/Journal.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/Journal.java
new file mode 100644
index 0000000..f2cd0bc
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/Journal.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn.journal;
+
+import static org.apache.jena.dboe.sys.Sys.SizeOfInt;
+
+import java.nio.ByteBuffer ;
+import java.util.Iterator ;
+import java.util.zip.Adler32 ;
+
+import org.apache.jena.atlas.iterator.IteratorSlotted ;
+import org.apache.jena.atlas.lib.ByteBufferLib ;
+import org.apache.jena.atlas.lib.Closeable ;
+import org.apache.jena.atlas.lib.FileOps ;
+import org.apache.jena.atlas.lib.Sync ;
+import org.apache.jena.dboe.base.file.BufferChannel;
+import org.apache.jena.dboe.base.file.BufferChannelFile;
+import org.apache.jena.dboe.base.file.BufferChannelMem;
+import org.apache.jena.dboe.base.file.Location;
+import org.apache.jena.dboe.sys.Names;
+import org.apache.jena.dboe.transaction.txn.ComponentId;
+import org.apache.jena.dboe.transaction.txn.PrepareState;
+import org.apache.jena.dboe.transaction.txn.TransactionException;
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
+
+/** A transaction journal.
+* The journal is append-only for writes, with truncation of the file
+* every so often. It is read during recovery.
+* The size of entries depends on per-component redo/undo records;
+* the control records like COMMIT are quite small.
+* Entries have a CRC to ensure that part-entries are not acted on.
+*/
+
+public final
+class Journal implements Sync, Closeable
+{
+ private static final boolean LOGGING = false ;
+ private static Logger log = LoggerFactory.getLogger(Journal.class) ;
+
+ private static boolean logging() {
+ return LOGGING && log.isInfoEnabled() ;
+ }
+
+ private static void log(String fmt, Object...args) {
+ if ( ! logging() )
+ return ;
+ log.info(String.format(fmt, args));
+ }
+
+ private BufferChannel channel ;
+ private long position ;
+
+ // Header: fixed, inc CRC
+ // length of data 4 bytes
+ // CRC of whole entry. 4 bytes
+ // entry type 4 bytes (1 byte and 3 alignment)
+ // component 16 bytes (fixed??)
+ // Data area : variable
+ // Bytes
+
+ private static final int posnLength = 0 ;
+ private static final int posnCRC = posnLength + SizeOfInt ;
+ private static final int posnEntry = posnCRC + SizeOfInt ;
+ private static final int posnComponent = posnEntry + SizeOfInt ;
+ // Start of the component data area.
+ private static final int posnData = posnComponent + ComponentId.SIZE ;
+
+ // Currently, the header is fixed size so this is the size.
+ private static int HeaderLen = posnData-posnLength ;
+
+ private ByteBuffer header = ByteBuffer.allocate(HeaderLen) ;
+
+ public static boolean exists(Location location) {
+ if ( location.isMem() )
+ return false ;
+ return FileOps.exists(journalFilename(location)) ;
+ }
+
+ public static Journal create(Location location) {
+ BufferChannel chan ;
+ String channelName = journalFilename(location) ;
+ if ( location.isMem() )
+ chan = BufferChannelMem.create(channelName) ;
+ else
+ chan = BufferChannelFile.create(channelName) ;
+ return create(chan) ;
+ }
+
+ public static Journal create(BufferChannel chan) {
+ return new Journal(chan) ;
+ }
+
+ private static String journalFilename(Location location) {
+ return location.absolute(Names.journalFile) ;
+ }
+
+ private Journal(BufferChannel channel) {
+ this.channel = channel ;
+ position = 0 ;
+ }
+
+ // synchronized : excessive?
+ // Given the calling context, we know it's thread safe.
+
+ synchronized public long writeJournal(JournalEntry entry) {
+ long posn = write(entry.getType(), entry.getComponentId(), entry.getByteBuffer()) ;
+
+ if ( entry.getPosition() < 0 ) {
+ entry.setPosition(posn) ;
+ entry.setEndPosition(position) ;
+ }
+ return posn ;
+ }
+
+// /** Write an entry and return its location in the journal */
+// synchronized public void write(List<PrepareState> prepareStates) {
+// prepareStates.forEach(this::write) ;
+// }
+
+ public long write(PrepareState prepareState) {
+ return write(JournalEntryType.REDO, prepareState.getComponent(), prepareState.getData()) ;
+ }
+
+ /** Write an entry and return it's location in the journal */
+ synchronized public long write(JournalEntryType type, ComponentId componentId, ByteBuffer buffer) {
+ // Check buffer set right.
+ if ( LOGGING ) {
+ log("write@%-3d >> %s %s %s", position, type.name(),
+ componentId == null ? "<null>" : componentId.label(),
+ buffer == null ? "<null>" : ByteBufferLib.details(buffer)) ;
+ }
+
+ long posn = position ;
+ int len = -1 ;
+ int bufferLimit = -1 ;
+ int bufferPosition = -1 ;
+ if ( buffer != null ) {
+ bufferLimit = buffer.limit() ;
+ bufferPosition = buffer.position() ;
+ buffer.rewind() ;
+ len = buffer.remaining() ;
+ }
+
+ // Header: (length/4, crc/4, entry/4, component/16)
+
+ header.clear() ;
+ header.putInt(len) ;
+ header.putInt(0) ; // Set CRC to zero
+ header.putInt(type.id) ;
+ header.put(componentId.getBytes()) ;
+ header.flip() ;
+ // Need to put CRC in before writing.
+
+ Adler32 adler = new Adler32() ;
+ adler.update(header.array()) ;
+
+ if ( len > 0 ) {
+ adler.update(buffer) ;
+ buffer.rewind() ;
+ }
+
+ int crc = (int)adler.getValue() ;
+ header.putInt(posnCRC, crc) ;
+ if ( LOGGING )
+ log("write@ -- crc = %s", Integer.toHexString(crc) ) ;
+ channel.write(header) ;
+ if ( len > 0 ) {
+ channel.write(buffer) ;
+ buffer.position(bufferPosition) ;
+ buffer.limit(bufferLimit) ;
+ }
+ position += HeaderLen + len ;
+ if ( LOGGING )
+ log("write@%-3d << %s", position, componentId.label()) ;
+
+ if ( len > 0 ) {
+ buffer.position(bufferPosition) ;
+ buffer.limit(bufferLimit) ;
+ }
+
+ return posn ;
+ }
+
+ synchronized public JournalEntry readJournal(long id) {
+ return _readJournal(id) ;
+ }
+
+ private JournalEntry _readJournal(long id) {
+ long x = channel.position() ;
+ if ( x != id )
+ channel.position(id) ;
+ JournalEntry entry = _read() ;
+ long x2 = channel.position() ;
+ entry.setPosition(id) ;
+ entry.setEndPosition(x2) ;
+ if ( x != id )
+ channel.position(x) ;
+ return entry ;
+ }
+
+ // read one entry at the channel position.
+ // Move position to end of read.
+ private JournalEntry _read() {
+ if ( LOGGING ) {
+ log("read@%-3d >>", channel.position()) ;
+ }
+
+ header.clear() ;
+ int lenRead = channel.read(header) ;
+ if ( lenRead == -1 ) {
+ // probably broken file.
+ throw new TransactionException("Read off the end of a journal file") ;
+ // return null ;
+ }
+ if ( lenRead != header.capacity() )
+ throw new TransactionException("Partial read of journal file") ;
+
+ header.rewind() ;
+ // Header: (length/4, crc/4, entry/4, component/16)
+ int len = header.getInt() ;
+ int checksum = header.getInt() ;
+ header.putInt(posnCRC, 0) ;
+ int entryType = header.getInt() ;
+ byte[] bytes = new byte[ComponentId.SIZE] ;
+ header.get(bytes) ;
+ ComponentId component = ComponentId.create(null, bytes) ;
+
+ Adler32 adler = new Adler32() ;
+ adler.update(header.array()) ;
+
+ ByteBuffer bb = null ;
+ if ( len > 0 ) {
+ bb = ByteBuffer.allocate(len) ;
+ lenRead = channel.read(bb) ;
+ if ( lenRead != len )
+ throw new TransactionException("Failed to read the journal entry data: wanted " + len + " bytes, got " + lenRead) ;
+ bb.rewind() ;
+ adler.update(bb) ;
+ bb.rewind() ;
+ }
+
+ int crc = (int)adler.getValue() ;
+ if ( checksum != crc )
+ throw new TransactionException("Checksum error reading from the Journal. "+Integer.toHexString(checksum)+" / "+Integer.toHexString(crc)) ;
+
+ JournalEntryType type = JournalEntryType.type(entryType) ;
+ JournalEntry entry = new JournalEntry(type, component, bb) ;
+ if ( LOGGING )
+ log("read@%-3d >> %s", channel.position(), entry) ;
+ return entry ;
+ }
+
+ /**
+ * Iterator of entries from current point in Journal, going forward. Must be
+ * JournalEntry aligned at start.
+ */
+ private class IteratorEntries extends IteratorSlotted<JournalEntry> {
+ JournalEntry slot = null ;
+ final long endPoint ;
+ long iterPosn ;
+
+ public IteratorEntries(long startPosition) {
+ iterPosn = startPosition ;
+ endPoint = channel.size() ;
+ }
+
+ @Override
+ protected JournalEntry moveToNext() {
+ // synchronized necessary? Outer policy is single thread?
+ synchronized (Journal.this) {
+ if ( iterPosn >= endPoint )
+ return null ;
+ JournalEntry e = _readJournal(iterPosn) ;
+ iterPosn = e.getEndPosition() ;
+ return e ;
+ }
+ }
+
+ @Override
+ protected boolean hasMore() {
+ return iterPosn < endPoint ;
+ }
+ }
+
+ public Iterator<JournalEntry> entries() {
+ return new IteratorEntries(0) ;
+ }
+
+ synchronized public Iterator<JournalEntry> entries(long startPosition) {
+ return new IteratorEntries(startPosition) ;
+ }
+
+ @Override
+ public void sync() { channel.sync() ; }
+
+ @Override
+ public void close() { channel.close() ; }
+
+ public long size() { return channel.size() ; }
+
+ public boolean isEmpty() { return channel.size() == 0 ; }
+
+ public void truncate(long size) { channel.truncate(size) ; }
+
+ public void reset() {
+ truncate(0) ;
+ sync() ;
+ }
+
+// public void append() { position(size()) ; }
+
+ public long position() { return channel.position() ; }
+
+// public void position(long posn) { channel.position(posn) ; }
+
+ public String getFilename() { return channel.getFilename() ; }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/JournalControl.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/JournalControl.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/JournalControl.java
new file mode 100644
index 0000000..c0dde47
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/JournalControl.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn.journal;
+
+import java.util.Iterator ;
+
+import org.apache.jena.dboe.base.file.BufferChannelFile;
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
+
+
+public class JournalControl
+{
+ private static Logger log = LoggerFactory.getLogger(JournalControl.class) ;
+
+ // In TransactionCoordinator
+// // Interface
+// public static void recovery(TransactionCoordinator txnCoord)
+// {
+// txnCoord.getJournal() ;
+//
+// }
+//
+// public static void replay(Transaction transaction)
+// {}
+
+ /** Dump a journal - debug support function - opens the journal specially - inconsistent views possible */
+ public static void print(String filename) {
+ BufferChannelFile chan = BufferChannelFile.createUnmanaged(filename, "r") ;
+ Journal journal = Journal.create(chan) ;
+ JournalControl.print(journal) ;
+ chan.close() ;
+ }
+
+ public static void print(Journal journal) {
+ System.out.println("Size: "+journal.size()) ;
+ Iterator<JournalEntry> iter = journal.entries() ;
+
+ for ( ; iter.hasNext() ; )
+ {
+ JournalEntry e = iter.next() ;
+ //System.out.println("Posn: "+journal.position()+" : ("+(journal.size()-journal.position())+")") ;
+ System.out.println(JournalEntry.format(e)) ;
+ }
+ }
+
+// /** Recover a base storage DatasetGraph */
+// public static void recovery(Location location)
+// {
+// if ( location.isMem() )
+// return ;
+//
+// // Do we need to recover?
+// Journal journal = findJournal(location) ;
+// if ( journal == null || journal.isEmpty() )
+// return ;
+//
+// for ( FileRef fileRef : dsg.getConfig().nodeTables.keySet() )
+// recoverNodeDat(dsg, fileRef) ;
+// recoverFromJournal(dsg.getConfig(), journal) ;
+//
+// journal.close() ;
+// // Recovery complete. Tidy up. Node journal files have already been handled.
+// if ( journal.getFilename() != null )
+// {
+// if ( FileOps.exists(journal.getFilename()) )
+// FileOps.delete(journal.getFilename()) ;
+// }
+// }
+//
+// private static Journal findJournal(Location location)
+// {
+// String journalFilename = location.absolute(Names.journalFile) ;
+// File f = new File(journalFilename) ;
+// //if ( FileOps.exists(journalFilename)
+//
+// if ( f.exists() && f.isFile() && f.length() > 0 )
+// return Journal.create(location) ;
+// else
+// return null ;
+// }
+//
+// /** Recovery from a journal.
+// * Find if there is a commit record; if so, replay the journal to that point.
+// * Try to see if there is another commit record ...
+// * Return true if a recovery was attempted; return false if we decided no work needed.
+// */
+// public static boolean recoverFromJournal(StorageConfig sConf, Journal jrnl)
+// {
+// if ( jrnl.isEmpty() )
+// return false ;
+//
+// long posn = 0 ;
+// for ( ;; )
+// {
+// // Any errors indicate a partially written journal.
+// // A commit was not written properly in the prepare phase.
+// // e.g. JVM died half-way though writing the prepare phase data.
+// // The valid journal ends at this point. Exit loop and clean up.
+//
+// long x ;
+// try { x = scanForCommit(jrnl, posn) ; }
+// catch (TDBException ex) { x = -1 ; }
+//
+// if ( x == -1 ) break ;
+// recoverSegment(jrnl, posn, x, sConf) ;
+// posn = x ;
+// }
+//
+// // We have replayed the journals - clean up.
+// jrnl.truncate(0) ;
+// jrnl.sync() ;
+// syncAll(sConf) ;
+// return true ;
+// }
+//
+// /** Scan to a commit entry, starting at a given position in the journal.
+// * Return address of entry after commit if found, else -1.
+// */
+// private static long scanForCommit(Journal jrnl, long startPosn)
+// {
+// Iterator<JournalEntry> iter = jrnl.entries(startPosn) ;
+// try {
+// for ( ; iter.hasNext() ; )
+// {
+// JournalEntry e = iter.next() ;
+// if ( e.getType() == JournalEntryType.Commit )
+// return e.getEndPosition() ;
+// }
+// return -1 ;
+// } finally { Iter.close(iter) ; }
+// }
+//
+// /** Recover one transaction from the start position given.
+// * Scan to see if theer is a commit; if found, play the
+// * journal from the start point to the commit.
+// * Return true is a commit was found.
+// * Leave journal positioned just after commit or at end if none found.
+// */
+// private static void recoverSegment(Journal jrnl, long startPosn, long endPosn, StorageConfig sConf)
+// {
+// Iterator<JournalEntry> iter = jrnl.entries(startPosn) ;
+// iter = jrnl.entries(startPosn) ;
+// try {
+// for ( ; iter.hasNext() ; )
+// {
+// JournalEntry e = iter.next() ;
+// if ( e.getType() == JournalEntryType.Commit )
+// {
+// if ( e.getEndPosition() != endPosn )
+// log.warn(format("Inconsistent: end at %d; expected %d", e.getEndPosition(), endPosn)) ;
+// return ;
+// }
+// replay(e, sConf) ;
+// }
+// } finally { Iter.close(iter) ; }
+// }
+//
+// /** Recover a node data file (".dat").
+// * Node data files are append-only so recovering, then not using the data is safe.
+// * Node data file is a precursor for full recovery that works from the master journal.
+// */
+// private static void recoverNodeDat(DatasetGraphTDB dsg, FileRef fileRef)
+// {
+// // See DatasetBuilderTxn - same name generation code.
+// // [TxTDB:TODO]
+//
+// RecordFactory recordFactory = new RecordFactory(SystemTDB.LenNodeHash, SystemTDB.SizeOfNodeId) ;
+// NodeTable baseNodeTable = dsg.getConfig().nodeTables.get(fileRef) ;
+// String objFilename = fileRef.getFilename()+"-"+Names.extJournal ;
+// objFilename = dsg.getLocation().absolute(objFilename) ;
+// File jrnlFile = new File(objFilename) ;
+// if ( jrnlFile.exists() && jrnlFile.length() > 0 )
+// {
+// syslog.info("Recovering node data: "+fileRef.getFilename()) ;
+// ObjectFile dataJrnl = FileFactory.createObjectFileDisk(objFilename) ;
+// NodeTableTrans ntt = new NodeTableTrans(null, objFilename, baseNodeTable, new IndexMap(recordFactory), dataJrnl) ;
+// ntt.append() ;
+// ntt.close() ;
+// dataJrnl.close() ;
+// baseNodeTable.sync() ;
+// }
+// if ( jrnlFile.exists() )
+// FileOps.delete(objFilename) ;
+// }
+//
+// public static void replay(Transaction transaction)
+// {
+// if ( syslog.isDebugEnabled())
+// syslog.debug("Replay "+transaction.getLabel()) ;
+// Journal journal = transaction.getJournal() ;
+// DatasetGraphTDB dsg = transaction.getBaseDataset() ;
+// // Currently, we (crudely) replay the whole journal.
+// replay(journal, dsg.getConfig()) ;
+// }
+//
+// /** Replay a journal onto a dataset */
+// public static void replay(Journal journal, DatasetGraphTDB dsg)
+// {
+// replay(journal, dsg.getConfig()) ;
+// }
+//
+// /** Replay a journal onto a store configuration (the file resources) */
+// private static void replay(Journal journal, StorageConfig sConf)
+// {
+// if ( journal.size() == 0 )
+// return ;
+//
+// journal.position(0) ;
+// try {
+// Iterator<JournalEntry> iter = journal.entries() ;
+//
+// for ( ; iter.hasNext() ; )
+// {
+// JournalEntry e = iter.next() ;
+// replay(e, sConf) ;
+//
+// // There is no point sync here.
+// // No writes via the DSG have been done.
+// // so all internal flags "syncNeeded" are false.
+// //dsg.sync() ;
+// }
+// }
+// catch (RuntimeException ex)
+// {
+// // Bad news travels fast.
+// syslog.error("Exception during journal replay", ex) ;
+// throw ex ;
+// }
+//
+// Collection<BlockMgr> x = sConf.blockMgrs.values() ;
+// for ( BlockMgr blkMgr : x )
+// blkMgr.syncForce() ;
+// // Must do a hard sync before this.
+// journal.truncate(0) ;
+// }
+//
+// /** return true for "go on" */
+// private static boolean replay(JournalEntry e, StorageConfig sConf)
+// {
+// switch (e.getType())
+// {
+// case Block:
+// {
+// BlockMgr blkMgr = sConf.blockMgrs.get(e.getFileRef()) ;
+// Block blk = e.getBlock() ;
+// log.debug("Replay: {} {}",e.getFileRef(), blk) ;
+// blk.setModified(true) ;
+// blkMgr.overwrite(blk) ;
+// return true ;
+// }
+// case Buffer:
+// {
+// BufferChannel chan = sConf.bufferChannels.get(e.getFileRef()) ;
+// ByteBuffer bb = e.getByteBuffer() ;
+// log.debug("Replay: {} {}",e.getFileRef(), bb) ;
+// chan.write(bb, 0) ; // YUK!
+// return true ;
+// }
+//
+// case Commit:
+// return false ;
+// case Abort:
+// case Object:
+// case Checkpoint:
+// errlog.warn("Unexpected block type: "+e.getType()) ;
+// }
+// return false ;
+// }
+//
+// private static void syncAll(StorageConfig sConf)
+// {
+// Collection<BlockMgr> x = sConf.blockMgrs.values() ;
+// for ( BlockMgr blkMgr : x )
+// blkMgr.syncForce() ;
+// Collection<BufferChannel> y = sConf.bufferChannels.values() ;
+// for ( BufferChannel bChan : y )
+// bChan.sync() ;
+// //sConf.nodeTables ;
+// }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/JournalEntry.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/JournalEntry.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/JournalEntry.java
new file mode 100644
index 0000000..4c6ff14
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/JournalEntry.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn.journal;
+
+import java.nio.ByteBuffer ;
+
+import org.apache.jena.atlas.lib.ByteBufferLib ;
+import org.apache.jena.atlas.lib.Bytes ;
+import org.apache.jena.dboe.transaction.txn.ComponentId;
+import org.apache.jena.dboe.transaction.txn.ComponentIds;
+
+public class JournalEntry
+{
+// static public final JournalEntry Redo = new JournalEntry(JournalEntryType.REDO) ;
+// static public final JournalEntry Undo = new JournalEntry(JournalEntryType.UNDO) ;
+
+
+ // Zero payload JournalEntry - create once.
+ static public final JournalEntry COMMIT = new JournalEntry(JournalEntryType.COMMIT, ComponentIds.idSystem) ;
+ static public final JournalEntry ABORT = new JournalEntry(JournalEntryType.ABORT, ComponentIds.idSystem) ;
+
+ private long position = -1 ; // Location in the Journal (if known).
+ private long endPosition = -1 ; // End location in the Journal: offset of next entry start.
+
+ private final JournalEntryType type ;
+ private final ComponentId componentId ;
+ private final ByteBuffer data ;
+
+ private JournalEntry(JournalEntryType type, ComponentId id) {
+ this(type, id, null) ;
+ }
+
+ public JournalEntry(JournalEntryType type, ComponentId componentId, ByteBuffer bytes) {
+ this.type = type ;
+ this.componentId = componentId ;
+ this.data = bytes ;
+ }
+
+ void setPosition(long posn) { position = posn ; }
+ void setEndPosition(long endPosn) { endPosition = endPosn ; }
+
+ public long getPosition() { return position ; }
+ long getEndPosition() { return endPosition ; }
+
+ public JournalEntryType getType() { return type ; }
+ public ComponentId getComponentId() { return componentId ; }
+ public ByteBuffer getByteBuffer() { return data ; }
+
+ @Override
+ public String toString()
+ {
+ return "JournalEntry: "+type+" "+componentId ;
+ }
+
+ static public String format(JournalEntry entry)
+ {
+ StringBuilder sbuff = new StringBuilder() ;
+
+ sbuff.append("Entry: ") ;
+ sbuff.append(" "+entry.type) ;
+ if ( entry.componentId != null ) {
+ String label = entry.componentId.label() ;
+ if ( label != null )
+ sbuff.append(label) ;
+ sbuff.append(" [..") ;
+ int z = Bytes.getInt(entry.componentId.getBytes(), entry.componentId.getBytes().length-4) ;
+ sbuff.append(Integer.toHexString(z)) ;
+ sbuff.append("]") ;
+ }
+ if ( entry.data != null )
+ sbuff.append(" "+ByteBufferLib.details(entry.data)) ;
+ return sbuff.toString() ;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/JournalEntryType.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/JournalEntryType.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/JournalEntryType.java
new file mode 100644
index 0000000..dc9cb9b
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/JournalEntryType.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn.journal;
+
+import org.apache.jena.atlas.lib.InternalErrorException ;
+import org.apache.jena.atlas.logging.Log ;
+
+/**
+ * Types of Journal entry.
+ * This set is quite general and so not all cases may be used in practice.
+ * <p>
+ * The id must be stable across new versions on the code as it ends up
+ * in the journal on-disk so we are explicit about id even though there is
+ * {@link Enum#ordinal}
+ */
+public enum JournalEntryType
+{
+ /*
+ * REDO, UNDO -- Actions (UNDO unused)
+ * COMMIT, ABORT -- Transaction action (ABORT unused)
+ * CHECKPOINT -- data written to the journal as a safe spill file (unused)
+ */
+ REDO(1), UNDO(2), COMMIT(3), ABORT(4)
+ /*, CHECKPOINT(6)*/
+ ;
+
+ final int id ;
+ JournalEntryType(int x) { id = x ; }
+ int getId() { return id ; }
+ static public JournalEntryType type(int x)
+ {
+ if ( x == REDO.id ) return REDO ;
+ else if ( x == UNDO.id ) return UNDO ;
+ else if ( x == COMMIT.id ) return COMMIT ;
+ else if ( x == ABORT.id ) return ABORT ;
+ //else if ( x == CHECKPOINT.id ) return CHECKPOINT ;
+ else {
+ Log.error(JournalEntryType.class, "Unknown type: "+x) ;
+ throw new InternalErrorException("Unknown type: "+x) ;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/package-info.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/package-info.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/package-info.java
new file mode 100644
index 0000000..5014f35
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.transaction.txn;
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/ComponentIdRegistry.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/ComponentIdRegistry.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/ComponentIdRegistry.java
deleted file mode 100644
index eba8aa6..0000000
--- a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/ComponentIdRegistry.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.transaction;
-
-
-
-public class ComponentIdRegistry {
-// // Not stable across JVMs hence "local"
-// private ComponentId localCid = ComponentId.create("Local", L.uuidAsBytes(UUID.randomUUID())) ;
-//
-// // No! byte[].equals is Object.equals
-//
-// // ComponentId equality is by bytes (not display label)
-// // so this maps bytes to a better form.
-// private Map<Holder, ComponentId> registry = new ConcurrentHashMap<>() ;
-//
-// public ComponentIdRegistry() { }
-//
-// public ComponentId registerLocal(String label, int index) {
-// return register(localCid, label, index) ;
-// }
-//
-// public ComponentId register(ComponentId base, String label, int index) {
-// byte[] bytes = base.bytes() ;
-// bytes = Arrays.copyOf(bytes, bytes.length) ;
-// int x = Bytes.getInt(bytes, bytes.length - SystemBase.SizeOfInt) ;
-// x = x ^ index ;
-// Bytes.setInt(x, bytes, bytes.length-SystemBase.SizeOfInt) ;
-// ComponentId cid = new ComponentId(label+"-"+index, bytes) ;
-// Holder h = new Holder(bytes) ;
-// registry.put(h, cid) ;
-// return cid ;
-// }
-//
-// public ComponentId lookup(byte[] bytes) {
-// Holder h = new Holder(bytes) ;
-// return registry.get(h) ;
-// }
-//
-// public void reset() {
-// registry.clear() ;
-// }
-//
-// // Makes equality the value of the bytes.
-// static class Holder {
-// private final byte[] bytes ;
-//
-// Holder(byte[] bytes) { this.bytes = bytes ; }
-//
-// @Override
-// public int hashCode() {
-// final int prime = 31 ;
-// int result = 1 ;
-// result = prime * result + Arrays.hashCode(bytes) ;
-// return result ;
-// }
-//
-// @Override
-// public boolean equals(Object obj) {
-// if ( this == obj )
-// return true ;
-// if ( obj == null )
-// return false ;
-// if ( getClass() != obj.getClass() )
-// return false ;
-// Holder other = (Holder)obj ;
-// if ( !Arrays.equals(bytes, other.bytes) )
-// return false ;
-// return true ;
-// }
-//
-// @Override
-// public String toString() {
-// return Bytes.asHex(bytes) ;
-// }
-// }
-}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/ThreadTxn.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/ThreadTxn.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/ThreadTxn.java
deleted file mode 100644
index f026104..0000000
--- a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/ThreadTxn.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.transaction;
-
-import java.util.Objects ;
-import java.util.concurrent.Executor ;
-import java.util.concurrent.Semaphore ;
-import java.util.concurrent.atomic.AtomicReference ;
-
-import org.apache.jena.query.ReadWrite ;
-
-import org.seaborne.dboe.sys.Sys ;
-import org.apache.jena.sparql.core.Transactional ;
-
-/**
- * An action that will happen on a different thread later when {@link #run} is
- * called. A thread is created and the transaction started during a call to the
- * creation operations {@link #threadTxnRead} or {@link #threadTxnWrite}.
- * The associated Runnable is called and the transaction completed when
- * {@link #run} is called. Being on a thread, the state of the world the
- * forked transaction sees is outside the creating thread which may itself be in a
- * transaction. Warning: creating a write transaction inside a write transaction
- * will cause deadlock.
- */
-public class ThreadTxn {
-
- /** Create a thread-backed delayed READ transaction action. */
- public static ThreadTxn threadTxnRead(Transactional trans, Runnable action) {
- return ThreadTxn.create(trans, ReadWrite.READ, action, false) ;
- }
-
- /** Create a thread-backed delayed WRITE action.
- * If called from inside a write transaction on the {@code trans},
- * this will deadlock.
- */
- public static ThreadTxn threadTxnWrite(Transactional trans, Runnable action) {
- return ThreadTxn.create(trans, ReadWrite.WRITE, action, true) ;
- }
-
- /** Create a thread-backed delayed WRITE-abort action (testing). */
- public static ThreadTxn threadTxnWriteAbort(Transactional trans, Runnable action) {
- return ThreadTxn.create(trans, ReadWrite.WRITE, action, false) ;
- }
-
- private final Semaphore semaStart ;
- private final Semaphore semaFinish ;
- private final AtomicReference<RuntimeException> thrownRuntimeException = new AtomicReference<>(null) ;
- private final AtomicReference<Error> thrownError = new AtomicReference<>(null) ;
- private final Runnable action ;
-
- private ThreadTxn(Runnable action) {
- this.action = action ;
- this.semaStart = new Semaphore(0, true) ;
- this.semaFinish = new Semaphore(0, true) ;
- }
-
- /**
- * Perform the Runnable, reporting any
- * {@link java.lang.RuntimeException} or {@link java.lang.Error}
- */
- public void run() {
- semaStart.release();
- semaFinish.acquireUninterruptibly() ;
- if ( thrownError.get() != null )
- throw thrownError.get() ;
- if ( thrownRuntimeException.get() != null )
- throw thrownRuntimeException.get() ;
- }
-
- // Called on the async thread.
- private void trigger() {
- try { action.run(); }
- catch (Error error) { thrownError.set(error) ; throw error ;}
- catch (RuntimeException ex) { thrownRuntimeException.set(ex) ; throw ex ; }
- }
-
- // System-shared executor.
- private static Executor executor = Sys.executor ;
-
- /*package*/ static ThreadTxn create(Transactional trans, ReadWrite mode, Runnable action, boolean isCommit) {
- Objects.requireNonNull(trans) ;
- Objects.requireNonNull(mode) ;
- Objects.requireNonNull(action) ;
-
- ThreadTxn threadAction = new ThreadTxn(action) ;
- // Startup semaphore so that the thread has started by the
- // time we exit this setup function.
- Semaphore semaStartup = new Semaphore(0, true) ;
- executor.execute( ()-> {
- // NB. trans.begin then semaStartup.release() ;
- // This ensures that the transaction has really started.
- trans.begin(mode) ;
-
- // Signal the creator (see below) that the transaction has started.
- semaStartup.release() ;
-
- // Wait for the signal to run the action.
- threadAction.semaStart.acquireUninterruptibly();
-
- try {
- // Performane the action, catch and record any RuntimeException or Error.
- threadAction.trigger() ;
-
- // Finish transaction (if no throwable)
- if ( mode == ReadWrite.WRITE ) {
- if ( isCommit )
- trans.commit();
- else
- trans.abort() ;
- trans.end() ;
- } else {
- // Read
- if ( isCommit )
- trans.commit();
- trans.end() ;
- }
- }
- catch (Throwable ex) {
- // Suppress now it has trigger transaction mechanism in
- // the presence of an unchecked exception.
- // Passed to the main thread via ThreadTxn
- }
- finally { threadAction.semaFinish.release() ; }
- }) ;
- // Don't return until the transaction has started.
- semaStartup.acquireUninterruptibly();
- return threadAction ;
- }
-}