You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2018/01/04 20:46:19 UTC
[05/14] jena git commit: JENA-1458: Promotion API integration for TDB2
JENA-1458: Promotion API integration for TDB2
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/d6770eaf
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/d6770eaf
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/d6770eaf
Branch: refs/heads/master
Commit: d6770eaf62f13d60bc5f414cfa6b2f3a50fca445
Parents: 64f6357
Author: Andy Seaborne <an...@apache.org>
Authored: Sun Dec 31 17:15:04 2017 +0000
Committer: Andy Seaborne <an...@apache.org>
Committed: Sun Dec 31 17:16:45 2017 +0000
----------------------------------------------------------------------
.../sparql/core/TransactionHandlerView.java | 7 +-
.../mem/TestDatasetGraphInMemoryPromote.java | 25 ----
.../transaction/AbstractTestTransPromote.java | 50 -------
.../jena/dboe/transaction/Transactional.java | 51 ++++---
.../dboe/transaction/TransactionalMonitor.java | 8 +-
.../dboe/transaction/txn/QuorumGenerator.java | 6 +-
.../jena/dboe/transaction/txn/Transaction.java | 22 +--
.../transaction/txn/TransactionCoordinator.java | 141 +++++++++++--------
.../dboe/transaction/txn/TransactionInfo.java | 15 +-
.../dboe/transaction/txn/TransactionalBase.java | 39 +++--
.../transaction/txn/TransactionalSystem.java | 9 --
.../TestTransactionCoordinatorControl.java | 32 ++---
.../transaction/TestTransactionLifecycle2.java | 61 ++++----
.../jena/dboe/transaction/TestTxnSwitching.java | 12 +-
.../apache/jena/tdb2/store/DatasetGraphTDB.java | 31 +++-
.../jena/tdb2/store/GraphViewSwitchable.java | 32 +----
.../store/AbstractTestTransPromoteTDB2.java | 67 ---------
.../jena/tdb2/store/TestTransPromoteTDB.java | 29 +---
.../tdb/transaction/TransactionManager.java | 2 +
.../tdb/transaction/TestTransPromoteTDB.java | 25 ----
20 files changed, 261 insertions(+), 403 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-arq/src/main/java/org/apache/jena/sparql/core/TransactionHandlerView.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/TransactionHandlerView.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/TransactionHandlerView.java
index 00d6a2a..63d7a22 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/core/TransactionHandlerView.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/TransactionHandlerView.java
@@ -19,7 +19,7 @@
package org.apache.jena.sparql.core;
import org.apache.jena.graph.impl.TransactionHandlerBase;
-import org.apache.jena.query.ReadWrite;
+import org.apache.jena.query.TxnType;
/** A graph TransactionHandler that for a graph view of a {@link DatasetGraph}*/
public class TransactionHandlerView extends TransactionHandlerBase
@@ -40,10 +40,7 @@ public class TransactionHandlerView extends TransactionHandlerBase
@Override
public void begin() {
- if ( false /* dsg.supportPromotion */)
- getDSG().begin(ReadWrite.READ);
- else
- getDSG().begin(ReadWrite.WRITE);
+ getDSG().begin(TxnType.READ_PROMOTE);
}
@Override
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-arq/src/test/java/org/apache/jena/sparql/core/mem/TestDatasetGraphInMemoryPromote.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/mem/TestDatasetGraphInMemoryPromote.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/mem/TestDatasetGraphInMemoryPromote.java
index 4c19c37..647442b 100644
--- a/jena-arq/src/test/java/org/apache/jena/sparql/core/mem/TestDatasetGraphInMemoryPromote.java
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/mem/TestDatasetGraphInMemoryPromote.java
@@ -39,31 +39,6 @@ public class TestDatasetGraphInMemoryPromote extends AbstractTestTransPromote {
}
@Override
- protected boolean supportsReadCommitted() {
- return true ;
- }
-
- @Override
- protected void setPromotion(boolean b) {
- DatasetGraphInMemory.promotion = b ;
- }
-
- @Override
- protected boolean getPromotion() {
- return DatasetGraphInMemory.promotion ;
- }
-
- @Override
- protected void setReadCommitted(boolean b) {
- DatasetGraphInMemory.readCommittedPromotion = b ;
- }
-
- @Override
- protected boolean getReadCommitted() {
- return DatasetGraphInMemory.readCommittedPromotion ;
- }
-
- @Override
protected Class<JenaTransactionException> getTransactionExceptionClass() {
return JenaTransactionException.class ;
}
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransPromote.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransPromote.java b/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransPromote.java
index fd87dce..3378a87 100644
--- a/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransPromote.java
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransPromote.java
@@ -38,7 +38,6 @@ import org.apache.jena.system.Txn ;
import org.apache.log4j.Level ;
import org.apache.log4j.Logger ;
import org.junit.After ;
-import org.junit.Assume ;
import org.junit.Before ;
import org.junit.Test ;
@@ -75,58 +74,11 @@ public abstract class AbstractTestTransPromote {
}
}
- /**
- * Return true if this implement supports transaction promotion (i.e. a read
- * transaction can beome a write transaction if an update is attempted.
- * This need not be the default mode - see {@link #setPromotion(boolean)}.
- */
- protected abstract boolean supportsReadCommitted() ;
-
- /** Enable transaction promotion (it does not need to be the defaukl bahvaiour of the system under test.
- * A call of setPromotion(true) is made before each test.
- * The original setting is retored at the end of the test.
- * @deprecated Redundant - remove.
- */
- @Deprecated
- protected abstract void setPromotion(boolean b) ;
- /**
- * @deprecated Redundant - remove.
- */
- @Deprecated
- protected abstract boolean getPromotion() ;
-
- /**
- * If {@link #supportsReadCommitted} is true (whether by default or not),
- * then set/reset the state aroudn tests that test its behaviour.
- * @deprecated Redundant - remove.
- */
- @Deprecated
- protected abstract void setReadCommitted(boolean b) ;
- /**
- * @deprecated Redundant - remove.
- */
- @Deprecated
- protected abstract boolean getReadCommitted() ;
-
// The exact class used by exceptions of the system under test.
// TDB transctions are in the TDBException hierarchy
// so can't be JenaTransactionException.
protected abstract Class<? extends Exception> getTransactionExceptionClass() ;
- @Before
- public void before() {
- stdPromotion = getPromotion() ;
- stdReadCommitted = getReadCommitted() ;
- setPromotion(true);
- setReadCommitted(true);
- }
-
- @After
- public void after() {
- setPromotion(stdPromotion);
- setReadCommitted(stdReadCommitted);
- }
-
protected AbstractTestTransPromote(Logger[] loggers) {
this.loggers = loggers ;
}
@@ -154,7 +106,6 @@ public abstract class AbstractTestTransPromote {
// READ-add
private void run_01(TxnType txnType) {
- Assume.assumeTrue( supportsReadCommitted() );
DatasetGraph dsg = create() ;
dsg.begin(txnType) ;
dsg.add(q1) ;
@@ -369,7 +320,6 @@ public abstract class AbstractTestTransPromote {
}
private void promote_clash_active_writer(ExecutorService executor, boolean activeWriterCommit) {
- setReadCommitted(false) ;
Semaphore semaActiveWriterStart = new Semaphore(0) ;
Semaphore semaActiveWriterContinue = new Semaphore(0) ;
Semaphore semaPromoteTxnStart = new Semaphore(0) ;
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/Transactional.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/Transactional.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/Transactional.java
index ab8491f..91d5f6b 100644
--- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/Transactional.java
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/Transactional.java
@@ -19,7 +19,8 @@
package org.apache.jena.dboe.transaction;
import org.apache.jena.dboe.jenax.Txn;
-import org.apache.jena.query.ReadWrite ;
+import org.apache.jena.query.ReadWrite;
+import org.apache.jena.query.TxnType;
/** Interface that encapsulates the begin/abort|commit/end operations.
* <p>The read lifecycle is:
@@ -36,21 +37,21 @@ import org.apache.jena.query.ReadWrite ;
* Directly called, code might look like:
* <pre>
* Transactional object = ...
- * object.begin(ReadWrite.READ) ;
+ * object.begin(ReadWrite.READ);
* try {
* ... actions inside a read transaction ...
- * } finally { object.end() ; }
+ * } finally { object.end(); }
* </pre>
* or
* <pre>
* Transactional object = ...
- * object.begin(ReadWrite.WRITE) ;
+ * object.begin(ReadWrite.WRITE);
* try {
* ... actions inside a write transaction ...
- * object.commit() ;
+ * object.commit();
* } finally {
* // This causes an abort if {@code commit} has not been called.
- * object.end() ;
+ * object.end();
* }
* </pre>
* Exceptions will not be thrown.
@@ -60,41 +61,49 @@ public interface Transactional extends org.apache.jena.sparql.core.Transactional
{
/** Start either a READ or WRITE transaction */
@Override
- public void begin(ReadWrite readWrite) ;
+ public default void begin(ReadWrite readWrite) {
+ begin(TxnType.convert(readWrite));
+ }
+
+ @Override
+ public void begin(TxnType type);
/** Attempt to promote a read transaction to a write transaction.
* This is not guaranteed to succeed - any changes by another write transaction
- * may restrict promotion.
+ * may restrict promotion. This depends on the transaction type as to whether
+ * it is "read commited" or not.
* <p>
- * In the MR+SW implementation, any intervening write transaction will block promotion.
+ * If not "read committed", any intervening write transaction will block promotion.
+ * Otherwise, at the point or promotion, changes by other writers become visible.
* <p>
* Promoting a transaction which is already a write transaction will return true.
- * <p>
- * Consider also:
- * <pre>
- * .end() ;
- * .begin(WRITE) ;
- * </pre>
- * to see any intermediate commits from another writer.
*
* @return boolean indicating whether the transaction is now a write transaction or not.
*/
@Override
- public boolean promote() ;
+ public boolean promote();
/** Commit a transaction - finish the transaction and make any changes permanent (if a "write" transaction) */
@Override
- public void commit() ;
+ public void commit();
/** Abort a transaction - finish the transaction and undo any changes (if a "write" transaction) */
@Override
- public void abort() ;
+ public void abort();
/** Finish the transaction - if a write transaction and commit() has not been called, then abort. */
@Override
- public void end() ;
+ public void end();
+
+ /** Return the current mode of the transaction - "read" or "write" */
+ @Override
+ public ReadWrite transactionMode();
+
+ /** Return the transaction type used in {@code begin(TxnType)}. */
+ @Override
+ public TxnType transactionType();
/** Say whether inside a transaction. */
@Override
- public boolean isInTransaction() ;
+ public boolean isInTransaction();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransactionalMonitor.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransactionalMonitor.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransactionalMonitor.java
index df7b233..6b05410 100644
--- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransactionalMonitor.java
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransactionalMonitor.java
@@ -18,12 +18,12 @@
package org.apache.jena.dboe.transaction;
-import org.apache.jena.query.ReadWrite ;
+import org.apache.jena.query.TxnType;
-/** Interface for the Transactional interface */
+/** Interface for the Transactional transitions. */
public interface TransactionalMonitor {
- default void startBegin(ReadWrite mode) {}
- default void finishBegin(ReadWrite mode) {}
+ default void startBegin(TxnType txnType) {}
+ default void finishBegin(TxnType txnType) {}
default void startPromote() {}
default void finishPromote() {}
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/QuorumGenerator.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/QuorumGenerator.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/QuorumGenerator.java
index d70322d..34254f7 100644
--- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/QuorumGenerator.java
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/QuorumGenerator.java
@@ -18,11 +18,11 @@
package org.apache.jena.dboe.transaction.txn;
-import org.apache.jena.query.ReadWrite ;
+import org.apache.jena.query.TxnType;
-/** Generate a transaction quorum for a transaction as it begins */
+/** Generate a transaction quorum for a transaction as it begins */
public interface QuorumGenerator {
- public ComponentGroup genQuorum(ReadWrite mode) ;
+ public ComponentGroup genQuorum(TxnType txnType) ;
}
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/Transaction.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/Transaction.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/Transaction.java
index fe6b2a9..d523e74 100644
--- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/Transaction.java
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/Transaction.java
@@ -33,6 +33,7 @@ import java.util.Objects ;
import java.util.concurrent.atomic.AtomicReference ;
import org.apache.jena.query.ReadWrite ;
+import org.apache.jena.query.TxnType;
/**
* A transaction as the composition of actions on components.
@@ -58,25 +59,27 @@ public class Transaction implements TransactionInfo {
// It also allow for multithreaded transactions (later).
private final AtomicReference<TxnState> state = new AtomicReference<>() ;
//private TxnState state ;
- private long dataVersion ;
+ private final long dataVersion ;
+ private final TxnType txnType ;
private ReadWrite mode ;
- public Transaction(TransactionCoordinator txnMgr, TxnId txnId, ReadWrite readWrite, long dataVersion, List<SysTrans> components) {
+ public Transaction(TransactionCoordinator txnMgr, TxnType txnType, ReadWrite readWrite, TxnId txnId, long dataVersion, List<SysTrans> components) {
Objects.requireNonNull(txnMgr) ;
Objects.requireNonNull(txnId) ;
Objects.requireNonNull(readWrite) ;
Objects.requireNonNull(components) ;
this.txnMgr = txnMgr ;
this.txnId = txnId ;
+ this.txnType = txnType ;
this.mode = readWrite ;
this.dataVersion = dataVersion ;
this.components = components ;
setState(INACTIVE) ;
}
- /*package*/ void resetDataVersion(long dataVersion) {
- this.dataVersion = dataVersion;
- }
+// /*package*/ void resetDataVersion(long dataVersion) {
+// this.dataVersion = dataVersion;
+// }
/*package*/ void setState(TxnState newState) {
state.set(newState) ;
@@ -139,7 +142,7 @@ public class Transaction implements TransactionInfo {
}
public void commit() {
- // Split into READ and WRITE forms.
+ // XXX Split into READ and WRITE forms.
TxnState s = getState();
if ( s == ACTIVE )
// Auto exec prepare().
@@ -242,10 +245,13 @@ public class Transaction implements TransactionInfo {
}
@Override
- public TxnId getTxnId() { return txnId ; }
+ public TxnId getTxnId() { return txnId ; }
+
+ @Override
+ public TxnType getTxnType() { return txnType ; }
@Override
- public ReadWrite getMode() { return mode ; }
+ public ReadWrite getMode() { return mode ; }
/** Is this a READ transaction?
* Convenience operation equivalent to {@code (getMode() == READ)}
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
index 69f9b8e..92510c5 100644
--- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
@@ -19,7 +19,6 @@
package org.apache.jena.dboe.transaction.txn;
import static org.apache.jena.dboe.transaction.txn.journal.JournalEntryType.UNDO;
-import static org.apache.jena.query.ReadWrite.WRITE ;
import java.nio.ByteBuffer ;
import java.util.ArrayList ;
@@ -32,12 +31,14 @@ import java.util.concurrent.atomic.AtomicLong ;
import java.util.concurrent.locks.ReadWriteLock ;
import java.util.concurrent.locks.ReentrantReadWriteLock ;
+import org.apache.jena.atlas.lib.InternalErrorException;
import org.apache.jena.atlas.logging.Log ;
import org.apache.jena.dboe.base.file.Location;
import org.apache.jena.dboe.sys.Sys;
import org.apache.jena.dboe.transaction.txn.journal.Journal;
import org.apache.jena.dboe.transaction.txn.journal.JournalEntry;
import org.apache.jena.query.ReadWrite ;
+import org.apache.jena.query.TxnType;
import org.slf4j.Logger ;
/**
@@ -446,23 +447,26 @@ public class TransactionCoordinator {
}
/** Start a transaction. This may block. */
- public Transaction begin(ReadWrite readWrite) {
- return begin(readWrite, true) ;
+ public Transaction begin(TxnType txnType) {
+ return begin(txnType, true) ;
}
/**
- * Start a transaction. Returns null if this operation would block.
+ * Start a transaction.
+ * Returns null if this operation would block.
* Readers can start at any time.
- * A single writer policy is currently imposed so a "begin(WRITE)"
- * may block.
+ * A single writer policy is currently imposed so a "begin(WRITE)" may block.
*/
- public Transaction begin(ReadWrite readWrite, boolean canBlock) {
- Objects.nonNull(readWrite) ;
+ public Transaction begin(TxnType txnType, boolean canBlock) {
+ Objects.nonNull(txnType) ;
checkActive() ;
- // XXX Flag to bounce writers fpor long term "block writers"
+ // XXX Flag to bounce writers for long term "block writers"
if ( false /* bounceWritersAtTheMoment */) {
- if ( readWrite == WRITE ) {
+ // Is this stil needed?
+ // Switching happens as copy, not in-place compaction (at the moment).
+ // so we don't need a write-reject mode currently.
+ if ( txnType == TxnType.WRITE ) {
throw new TransactionException("Writers currently being rejected");
}
}
@@ -475,7 +479,7 @@ public class TransactionCoordinator {
}
// Readers never block.
- if ( readWrite == WRITE ) {
+ if ( txnType == TxnType.WRITE ) {
// Writers take a WRITE permit from the semaphore to ensure there
// is at most one active writer, else the attempt to start the
// transaction blocks.
@@ -486,7 +490,7 @@ public class TransactionCoordinator {
return null ;
}
}
- Transaction transaction = begin$(readWrite) ;
+ Transaction transaction = begin$(txnType) ;
startActiveTransaction(transaction) ;
transaction.begin();
return transaction;
@@ -507,15 +511,15 @@ public class TransactionCoordinator {
*/
private final AtomicLong dataVersion = new AtomicLong(0) ;
- private Transaction begin$(ReadWrite readWrite) {
+ private Transaction begin$(TxnType txnType) {
synchronized(coordinatorLock) {
// Thread safe part of 'begin'
// Allocate the transaction serialization point.
TxnId txnId = txnIdGenerator.generate() ;
List<SysTrans> sysTransList = new ArrayList<>() ;
- Transaction transaction = new Transaction(this, txnId, readWrite, dataVersion.get(), sysTransList) ;
+ Transaction transaction = new Transaction(this, txnType, initialMode(txnType), txnId, dataVersion.get(), sysTransList) ;
- ComponentGroup txnComponents = chooseComponents(this.components, readWrite) ;
+ ComponentGroup txnComponents = chooseComponents(this.components, txnType) ;
try {
txnComponents.forEachComponent(elt -> {
@@ -536,10 +540,15 @@ public class TransactionCoordinator {
}
}
- private ComponentGroup chooseComponents(ComponentGroup components, ReadWrite readWrite) {
+ // Detemine ReadWrite for the transaction start from initial TxnType.
+ private static ReadWrite initialMode(TxnType txnType) {
+ return (txnType == TxnType.WRITE) ? ReadWrite.WRITE : ReadWrite.READ;
+ }
+
+ private ComponentGroup chooseComponents(ComponentGroup components, TxnType txnType) {
if ( quorumGenerator == null )
return components ;
- ComponentGroup cg = quorumGenerator.genQuorum(readWrite) ;
+ ComponentGroup cg = quorumGenerator.genQuorum(txnType) ;
if ( cg == null )
return components ;
cg.forEach((id, c) -> {
@@ -548,33 +557,34 @@ public class TransactionCoordinator {
log.warn("TransactionalComponent not in TransactionCoordinator's ComponentGroup") ;
}) ;
if ( log.isDebugEnabled() )
- log.debug("Custom ComponentGroup for transaction "+readWrite+": size="+cg.size()+" of "+components.size()) ;
+ log.debug("Custom ComponentGroup for transaction "+txnType+": size="+cg.size()+" of "+components.size()) ;
return cg ;
}
- /** Is promotion of transactions enabled? */
- /*private*/public/*for development*/ static boolean promotion = true ;
-
- /** Control of whether a transaction promotion can see any commits that
- * happened between this transaction starting and it promoting.
- * A form of "ReadCommitted".
- */
- /*private*/public/*for development*/ static boolean readCommittedPromotion = false ;
+// /** Is promotion of transactions enabled? */
+// /*private*/public/*for development*/ static boolean promotion = true ;
+//
+// /** Control of whether a transaction promotion can see any commits that
+// * happened between this transaction starting and it promoting.
+// * A form of "ReadCommitted".
+// */
+// /*private*/public/*for development*/ static boolean readCommittedPromotion = false ;
/** Whether to wait for writers when trying to promote */
private static final boolean promotionWaitForWriters = true;
/** Attempt to promote a transaction from READ to WRITE.
- * No-op for a transaction already a writer.
- * Throws {@link TransactionException} if the promotion
- * can not be done.
+ * Return true for a No-op for a transaction already a writer.
*/
/*package*/ boolean promoteTxn(Transaction transaction) {
- if ( ! promotion )
- return false;
-
- if ( transaction.getMode() == WRITE )
+ if ( transaction.getMode() == ReadWrite.WRITE )
return true ;
+ if ( transaction.getTxnType() == TxnType.READ )
+ throw new TransactionException("promote: can't promote a READ transaction") ;
+
+ if ( transaction.getTxnType() != TxnType.READ_COMMITTED_PROMOTE &&
+ transaction.getTxnType() != TxnType.READ_PROMOTE )
+ throw new InternalErrorException("Transaction type is "+transaction.getTxnType());
// Has there been an writer active since the transaction started?
// Do a test outside the lock - only dataVaersion can change and that increases.
@@ -583,20 +593,14 @@ public class TransactionCoordinator {
// If this test fails outside the lock it will fail inside.
// If it passes, we have to test again in case there is an active writer.
- if ( ! readCommittedPromotion ) {
- long txnEpoch = transaction.getDataVersion() ; // The transaction-start point.
- long currentEpoch = dataVersion.get() ; // The data serialization point.
-
- if ( txnEpoch < currentEpoch )
- // The data has changed and "read committed" not allowed.
- // We can reject now.
- return false ;
- }
+ //boolean readCommittedPromotion = transaction.getTxnType() == TxnType.READ_COMMITTED_PROMOTE;
// Once we have acquireWriterLock, we are single writer.
// We may have to discard writer status because eocne we can make the defintite
// decision on promotion, we find we can't promote after all.
- if ( readCommittedPromotion ) {
+
+ // == Read committed path.
+ if ( transaction.getTxnType() == TxnType.READ_COMMITTED_PROMOTE ) {
/*
* acquireWriterLock(true) ;
* synchronized(coordinatorLock) {
@@ -609,8 +613,8 @@ public class TransactionCoordinator {
synchronized(coordinatorLock) {
try {
transaction.promoteComponents() ;
- // Because we want to see the new state of the data.s
- //transaction.resetDataVersion(dataVersion.get());
+ // Because we want to see the new state of the data.
+ // transaction.resetDataVersion(dataVersion.get());
} catch (TransactionException ex) {
try { transaction.abort(); } catch(RuntimeException ex2) {}
releaseWriterLock();
@@ -620,20 +624,27 @@ public class TransactionCoordinator {
return true;
}
+ // == Read with no committed allowed
+ // Check epoch is current - no "read committed".
+ // Check now outside synchronized (will need to check again to confirm) for speed
+ // and to allow for "no wait for writes".
+
+ if ( ! checkNoInterveningCommits(transaction) )
+ return false;
+
+ // Take writer lock.
if ( ! waitForWriters() )
// Failed to become a writer.
return false;
- // Now a proto-writer.
+ // Now a proto-writer. We need to confirm when inside the synchronized.
synchronized(coordinatorLock) {
// Not read commited.
// Need to check the data version once we are the writer and all previous
// writers have commited or aborted.
// Has there been an writer active since the transaction started?
- long txnEpoch = transaction.getDataVersion() ; // The transaction-start point.
- long currentEpoch = dataVersion.get() ; // The data serialization point.
-
- if ( txnEpoch != currentEpoch ) {
+
+ if ( ! checkNoInterveningCommits(transaction) ) {
// Failed to promote.
releaseWriterLock();
return false ;
@@ -652,6 +663,17 @@ public class TransactionCoordinator {
return true ;
}
+ private boolean checkNoInterveningCommits(Transaction transaction) {
+ long txnEpoch = transaction.getDataVersion() ; // The transaction-start point.
+ long currentEpoch = dataVersion.get() ; // The current data serialization point.
+
+ if ( txnEpoch < currentEpoch )
+ // The data has changed and "read committed" not allowed.
+ // We can reject now.
+ return false ;
+ return true;
+ }
+
private boolean waitForWriters() {
if ( promotionWaitForWriters )
return acquireWriterLock(true) ;
@@ -680,8 +702,14 @@ public class TransactionCoordinator {
notifyPrepareFinish(transaction);
}
- /*package*/ void executeCommit(Transaction transaction, Runnable commit, Runnable finish) {
- // This is the commit point.
+ /*package*/ void executeCommit(Transaction transaction, Runnable commit, Runnable finish) {
+ if ( transaction.getMode() == ReadWrite.READ ) {
+ finish.run();
+ notifyCommitFinish(transaction);
+ return;
+ }
+
+ // This is the commit for a write transaction
synchronized(coordinatorLock) {
// *** COMMIT POINT
journal.sync() ;
@@ -691,9 +719,8 @@ public class TransactionCoordinator {
journal.truncate(0) ;
// and tell the Transaction it's finished.
finish.run() ;
- // Bump global serialization point if necessary.
- if ( transaction.getMode() == WRITE )
- advanceDataVersion() ;
+ // Bump global serialization point
+ advanceDataVersion() ;
notifyCommitFinish(transaction) ;
}
}
@@ -763,14 +790,14 @@ public class TransactionCoordinator {
private void notifyCommitStart(Transaction transaction) {}
private void notifyCommitFinish(Transaction transaction) {
- if ( transaction.getMode() == WRITE )
+ if ( transaction.getMode() == ReadWrite.WRITE )
releaseWriterLock();
}
private void notifyAbortStart(Transaction transaction) { }
private void notifyAbortFinish(Transaction transaction) {
- if ( transaction.getMode() == WRITE )
+ if ( transaction.getMode() == ReadWrite.WRITE )
releaseWriterLock();
}
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionInfo.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionInfo.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionInfo.java
index ee31723..9ab179b 100644
--- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionInfo.java
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionInfo.java
@@ -21,6 +21,7 @@ package org.apache.jena.dboe.transaction.txn;
import static org.apache.jena.dboe.transaction.txn.TxnState.INACTIVE;
import org.apache.jena.query.ReadWrite ;
+import org.apache.jena.query.TxnType;
/**
* A view that provides information about a transaction
@@ -51,18 +52,24 @@ public interface TransactionInfo {
/** Get the transaction id for this transaction. Unique within this OS process (JVM) at least . */
public TxnId getTxnId() ;
+ /**
+ * What type is this transaction? This is the initial TxnType
+ * and does not change during a transaction's lifetime.
+ */
+ public TxnType getTxnType() ;
+
/** What mode is this transaction?
* This may change from {@code READ} to {@code WRITE} in a transactions lifetime.
*/
public ReadWrite getMode() ;
- /** Is this a view of a READ transaction?
- * Convenience operation equivalent to {@code (getMode() == READ)}
+ /** Is this currently a READ transaction? Promotion may chnage the mode.
+ * Convenience operation equivalent to {@code (getMode() == ReadWrite.READ)}
*/
public default boolean isReadTxn() { return getMode() == ReadWrite.READ ; }
- /** Is this a view of a WRITE transaction?
- * Convenience operation equivalent to {@code (getMode() == WRITE)}
+ /** Is this a currently a WRITE transaction?
+ * Convenience operation equivalent to {@code (getMode() == ReadWrite.WRITE)}
*/
public default boolean isWriteTxn() { return getMode() == ReadWrite.WRITE ; }
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java
index 30364d0..d4fd8f5 100644
--- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java
@@ -22,6 +22,7 @@ import java.util.Objects ;
import org.apache.jena.atlas.logging.Log ;
import org.apache.jena.query.ReadWrite ;
+import org.apache.jena.query.TxnType;
/**
* Framework for implementing a Transactional.
@@ -94,11 +95,11 @@ public class TransactionalBase implements TransactionalSystem {
}
@Override
- public final void begin(ReadWrite readWrite) {
- Objects.nonNull(readWrite) ;
+ public final void begin(TxnType txnType) {
+ Objects.nonNull(txnType) ;
checkRunning() ;
checkNotActive() ;
- Transaction transaction = txnMgr.begin(readWrite) ;
+ Transaction transaction = txnMgr.begin(txnType) ;
theTxn.set(transaction) ;
}
@@ -152,19 +153,36 @@ public class TransactionalBase implements TransactionalSystem {
_end() ;
}
- /**
- * Return the Read/write state (or null when not in a transaction)
- */
@Override
- final
- public ReadWrite getState() {
+ public ReadWrite transactionMode() {
+ checkRunning() ;
+ Transaction txn = getTxn() ;
+ if ( txn != null )
+ return txn.getMode() ;
+ return null ;
+ }
+
+ @Override
+ public TxnType transactionType() {
checkRunning() ;
+ Transaction txn = getTxn() ;
+ if ( txn != null )
+ return txn.getTxnType() ;
+ return null ;
+ }
+
+ @Override
+ public boolean isInTransaction() {
+ return getTxn() != null;
+ }
+
+ private Transaction getTxn() {
// tricky - touching theTxn causes it to initialize.
Transaction txn = theTxn.get() ;
if ( txn != null )
- return txn.getMode() ;
+ return txn;
theTxn.remove() ;
- return null ;
+ return null ;
}
@Override
@@ -177,6 +195,7 @@ public class TransactionalBase implements TransactionalSystem {
final
public Transaction getThreadTransaction() {
Transaction txn = theTxn.get() ;
+ // XXX Use getTxn() ??
// Touched the thread local so it is defined now.
// if ( txn == null )
// theTxn.remove() ;
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalSystem.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalSystem.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalSystem.java
index cca13bb..c4448e1 100644
--- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalSystem.java
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalSystem.java
@@ -19,7 +19,6 @@
package org.apache.jena.dboe.transaction.txn;
import org.apache.jena.dboe.transaction.Transactional;
-import org.apache.jena.query.ReadWrite ;
/** Implementation side of a {@link Transactional}.
* {@link Transactional} presents the application facing view
@@ -61,14 +60,6 @@ public interface TransactionalSystem extends Transactional {
/** Get the associated {@link TransactionCoordinator} */
public TransactionCoordinator getTxnMgr() ;
- /** Return the Read/Write state from the point of view of the caller.
- * Return null when not in a transaction.
- */
- public ReadWrite getState() ;
-
- @Override
- public default boolean isInTransaction() { return getState() != null ; }
-
/** Return an information view of the transaction for this thread, if any.
* Returns null when there is no active transaction for this tread.
*/
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTransactionCoordinatorControl.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTransactionCoordinatorControl.java b/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTransactionCoordinatorControl.java
index 977a45c..f99b37a 100644
--- a/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTransactionCoordinatorControl.java
+++ b/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTransactionCoordinatorControl.java
@@ -18,25 +18,21 @@
package org.apache.jena.dboe.transaction;
-import static org.junit.Assert.assertEquals ;
-import static org.junit.Assert.assertFalse ;
-import static org.junit.Assert.assertNotNull ;
-import static org.junit.Assert.assertNull ;
-import static org.junit.Assert.assertTrue ;
+import static org.junit.Assert.*;
import java.util.concurrent.Semaphore ;
import java.util.concurrent.atomic.AtomicInteger ;
-import org.apache.jena.system.ThreadTxn;
-import org.apache.jena.system.ThreadAction;
+
import org.apache.jena.dboe.base.file.Location;
import org.apache.jena.dboe.jenax.Txn;
import org.apache.jena.dboe.migrate.L;
-import org.apache.jena.dboe.transaction.Transactional;
import org.apache.jena.dboe.transaction.txn.Transaction;
import org.apache.jena.dboe.transaction.txn.TransactionCoordinator;
import org.apache.jena.dboe.transaction.txn.TransactionException;
import org.apache.jena.dboe.transaction.txn.TransactionalBase;
-import org.apache.jena.query.ReadWrite ;
+import org.apache.jena.query.TxnType;
+import org.apache.jena.system.ThreadAction;
+import org.apache.jena.system.ThreadTxn;
import org.junit.After ;
import org.junit.Before ;
import org.junit.Test ;
@@ -72,21 +68,21 @@ public class TestTransactionCoordinatorControl {
@Test public void txn_coord_disable_writers_2() {
txnMgr.blockWriters();
- Transaction txn = L.syncCallThread(()->txnMgr.begin(ReadWrite.WRITE, false)) ;
+ Transaction txn = L.syncCallThread(()->txnMgr.begin(TxnType.WRITE, false)) ;
assertNull(txn) ;
txnMgr.enableWriters();
- Transaction txn1 = L.syncCallThread(()->txnMgr.begin(ReadWrite.WRITE, false)) ;
+ Transaction txn1 = L.syncCallThread(()->txnMgr.begin(TxnType.WRITE, false)) ;
assertNotNull(txn1) ;
}
@Test public void txn_coord_disable_writers_3() {
txnMgr.blockWriters();
- Transaction txn = L.syncCallThread(()->txnMgr.begin(ReadWrite.READ, false)) ;
+ Transaction txn = L.syncCallThread(()->txnMgr.begin(TxnType.READ, false)) ;
assertNotNull(txn) ;
txnMgr.enableWriters();
- Transaction txn1 = L.syncCallThread(()->txnMgr.begin(ReadWrite.WRITE, false)) ;
+ Transaction txn1 = L.syncCallThread(()->txnMgr.begin(TxnType.WRITE, false)) ;
assertNotNull(txn1) ;
- Transaction txn2 = L.syncCallThread(()->txnMgr.begin(ReadWrite.READ, false)) ;
+ Transaction txn2 = L.syncCallThread(()->txnMgr.begin(TxnType.READ, false)) ;
assertNotNull(txn2) ;
}
@@ -108,17 +104,17 @@ public class TestTransactionCoordinatorControl {
@Test public void txn_coord_exclusive_1() {
txnMgr.startExclusiveMode();
L.syncOtherThread(()->{
- Transaction txn1 = txnMgr.begin(ReadWrite.WRITE, false) ;
+ Transaction txn1 = txnMgr.begin(TxnType.WRITE, false) ;
assertNull(txn1) ;
- Transaction txn2 = txnMgr.begin(ReadWrite.READ, false) ;
+ Transaction txn2 = txnMgr.begin(TxnType.READ, false) ;
assertNull(txn2) ;
}) ;
txnMgr.finishExclusiveMode();
L.syncOtherThread(()->{
- Transaction txn1 = txnMgr.begin(ReadWrite.WRITE, false) ;
+ Transaction txn1 = txnMgr.begin(TxnType.WRITE, false) ;
assertNotNull(txn1) ;
- Transaction txn2 = txnMgr.begin(ReadWrite.READ, false) ;
+ Transaction txn2 = txnMgr.begin(TxnType.READ, false) ;
assertNotNull(txn2) ;
}) ;
}
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTransactionLifecycle2.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTransactionLifecycle2.java b/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTransactionLifecycle2.java
index 6875ac4..0e6c29d 100644
--- a/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTransactionLifecycle2.java
+++ b/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTransactionLifecycle2.java
@@ -28,7 +28,8 @@ import org.apache.jena.dboe.transaction.txn.Transaction;
import org.apache.jena.dboe.transaction.txn.TransactionCoordinator;
import org.apache.jena.dboe.transaction.txn.TransactionException;
import org.apache.jena.dboe.transaction.txn.journal.Journal;
-import org.apache.jena.query.ReadWrite ;
+import org.apache.jena.query.ReadWrite;
+import org.apache.jena.query.TxnType;
import org.junit.After ;
import org.junit.Before ;
import org.junit.Test ;
@@ -62,21 +63,21 @@ public class TestTransactionLifecycle2 {
}
@Test public void txn_direct_01() {
- Transaction txn1 = txnMgr.begin(ReadWrite.READ) ;
+ Transaction txn1 = txnMgr.begin(TxnType.READ) ;
txn1.end();
checkClear() ;
}
@Test(expected=TransactionException.class)
public void txn_direct_02() {
- Transaction txn1 = txnMgr.begin(ReadWrite.WRITE) ;
+ Transaction txn1 = txnMgr.begin(TxnType.WRITE) ;
txn1.end();
checkClear() ;
}
@Test
public void txn_direct_03() {
- Transaction txn1 = txnMgr.begin(ReadWrite.WRITE) ;
+ Transaction txn1 = txnMgr.begin(TxnType.WRITE) ;
txn1.commit() ;
txn1.end() ;
checkClear() ;
@@ -84,13 +85,13 @@ public class TestTransactionLifecycle2 {
@Test
public void txn_direct_04() {
- Transaction txn1 = txnMgr.begin(ReadWrite.WRITE) ;
+ Transaction txn1 = txnMgr.begin(TxnType.WRITE) ;
// This tests the TransactionCoordinator
// but the TransactiolComponentLifecycle doesn't support multiple
// transactions per thread (use of ThreadLocals).
// To do that, the transaction object would be needed in all
// component API calls. Doable but intrusive.
- Transaction txn2 = txnMgr.begin(ReadWrite.READ) ;
+ Transaction txn2 = txnMgr.begin(TxnType.READ) ;
txn1.commit() ;
txn2.end() ;
txn1.end() ;
@@ -99,7 +100,7 @@ public class TestTransactionLifecycle2 {
@Test
public void txn_direct_05() {
- Transaction txn1 = txnMgr.begin(ReadWrite.WRITE) ;
+ Transaction txn1 = txnMgr.begin(TxnType.WRITE) ;
txn1.prepare() ;
txn1.commit() ;
txn1.end() ;
@@ -108,7 +109,7 @@ public class TestTransactionLifecycle2 {
@Test
public void txn_direct_06() {
- Transaction txn1 = txnMgr.begin(ReadWrite.WRITE) ;
+ Transaction txn1 = txnMgr.begin(TxnType.WRITE) ;
// txn1.prepare() ; Optional.
txn1.commit() ;
txn1.end() ;
@@ -117,10 +118,10 @@ public class TestTransactionLifecycle2 {
@Test
public void txn_overlap_WW() {
- Transaction txn1 = txnMgr.begin(ReadWrite.WRITE, false) ;
+ Transaction txn1 = txnMgr.begin(TxnType.WRITE, false) ;
assertNotNull(txn1) ;
- Transaction txn2 = txnMgr.begin(ReadWrite.WRITE, false) ;
+ Transaction txn2 = txnMgr.begin(TxnType.WRITE, false) ;
assertNull(txn2) ; // Otherwise blocking.
txn1.commit();
@@ -130,10 +131,10 @@ public class TestTransactionLifecycle2 {
@Test
public void txn_overlap_WR() {
- Transaction txn1 = txnMgr.begin(ReadWrite.WRITE, false) ;
+ Transaction txn1 = txnMgr.begin(TxnType.WRITE, false) ;
assertNotNull(txn1) ;
- Transaction txn2 = txnMgr.begin(ReadWrite.READ, false) ;
+ Transaction txn2 = txnMgr.begin(TxnType.READ, false) ;
assertNotNull(txn2) ;
txn1.commit();
@@ -144,10 +145,10 @@ public class TestTransactionLifecycle2 {
@Test
public void txn_overlap_RW() {
- Transaction txn1 = txnMgr.begin(ReadWrite.READ, false) ;
+ Transaction txn1 = txnMgr.begin(TxnType.READ, false) ;
assertNotNull(txn1) ;
- Transaction txn2 = txnMgr.begin(ReadWrite.WRITE, false) ;
+ Transaction txn2 = txnMgr.begin(TxnType.WRITE, false) ;
assertNotNull(txn2) ;
txn1.commit();
txn1.end() ;
@@ -158,10 +159,10 @@ public class TestTransactionLifecycle2 {
@Test
public void txn_overlap_RR() {
- Transaction txn1 = txnMgr.begin(ReadWrite.READ, false) ;
+ Transaction txn1 = txnMgr.begin(TxnType.READ, false) ;
assertNotNull(txn1) ;
- Transaction txn2 = txnMgr.begin(ReadWrite.READ, false) ;
+ Transaction txn2 = txnMgr.begin(TxnType.READ, false) ;
assertNotNull(txn2) ;
txn1.commit();
@@ -172,7 +173,7 @@ public class TestTransactionLifecycle2 {
@Test
public void txn_promote_1() {
- Transaction txn1 = txnMgr.begin(ReadWrite.READ) ;
+ Transaction txn1 = txnMgr.begin(TxnType.READ_PROMOTE) ;
assertNotNull(txn1) ;
boolean b = txn1.promote() ;
assertTrue(b) ;
@@ -184,7 +185,7 @@ public class TestTransactionLifecycle2 {
@Test
public void txn_promote_2() {
- Transaction txn1 = txnMgr.begin(ReadWrite.WRITE) ;
+ Transaction txn1 = txnMgr.begin(TxnType.WRITE) ;
boolean b = txn1.promote() ;
assertTrue(b) ;
b = txn1.promote() ;
@@ -196,7 +197,7 @@ public class TestTransactionLifecycle2 {
@Test(expected=TransactionException.class)
public void txn_promote_3() {
- Transaction txn1 = txnMgr.begin(ReadWrite.READ) ;
+ Transaction txn1 = txnMgr.begin(TxnType.READ) ;
boolean b = txn1.promote() ;
assertTrue(b) ;
b = txn1.promote() ;
@@ -208,8 +209,8 @@ public class TestTransactionLifecycle2 {
//Not a @Test
public void txn_promote_deadlock() {
- Transaction txn1 = txnMgr.begin(ReadWrite.READ) ;
- Transaction txn2 = txnMgr.begin(ReadWrite.WRITE) ;
+ Transaction txn1 = txnMgr.begin(TxnType.READ) ;
+ Transaction txn2 = txnMgr.begin(TxnType.WRITE) ;
// Deadlock.
// Promotion waits for the writer to decide whether it is commiting or not.
// This can't be done on one thread.
@@ -223,9 +224,9 @@ public class TestTransactionLifecycle2 {
@Test
public void txn_promote_thread_writer_1() {
- Transaction txn1 = txnMgr.begin(ReadWrite.READ) ;
+ Transaction txn1 = txnMgr.begin(TxnType.READ_PROMOTE) ;
L.syncOtherThread(()->{
- Transaction txn2 = txnMgr.begin(ReadWrite.WRITE) ;
+ Transaction txn2 = txnMgr.begin(TxnType.WRITE) ;
txn2.commit();
txn2.end() ;
}) ;
@@ -238,9 +239,9 @@ public class TestTransactionLifecycle2 {
@Test
public void txn_promote_thread_writer_2() {
- Transaction txn1 = txnMgr.begin(ReadWrite.READ) ;
+ Transaction txn1 = txnMgr.begin(TxnType.READ_PROMOTE) ;
L.syncOtherThread(()->{
- Transaction txn2 = txnMgr.begin(ReadWrite.WRITE) ;
+ Transaction txn2 = txnMgr.begin(TxnType.WRITE) ;
txn2.abort();
txn2.end() ;
}) ;
@@ -255,13 +256,13 @@ public class TestTransactionLifecycle2 {
@Test
public void txn_promote_thread_writer_3() {
- Transaction txn1 = txnMgr.begin(ReadWrite.READ) ;
+ Transaction txn1 = txnMgr.begin(TxnType.READ_PROMOTE) ;
boolean b = txn1.promote() ;
assertTrue(b) ;
AtomicReference<Transaction> ref = new AtomicReference<>(txn1) ;
L.syncOtherThread(()->{
// Should fail.
- Transaction txn2 = txnMgr.begin(ReadWrite.WRITE, false) ;
+ Transaction txn2 = txnMgr.begin(TxnType.WRITE, false) ;
ref.set(txn2);
}) ;
assertNull(ref.get()) ;
@@ -271,13 +272,13 @@ public class TestTransactionLifecycle2 {
@Test
public void txn_promote_thread_writer_4() {
- Transaction txn1 = txnMgr.begin(ReadWrite.READ) ;
+ Transaction txn1 = txnMgr.begin(TxnType.READ_PROMOTE) ;
boolean b = txn1.promote() ;
assertTrue(b) ;
AtomicReference<Transaction> ref = new AtomicReference<>(txn1) ;
L.syncOtherThread(()->{
// Should fail.
- Transaction txn2 = txnMgr.begin(ReadWrite.WRITE, false) ;
+ Transaction txn2 = txnMgr.begin(TxnType.WRITE, false) ;
ref.set(txn2);
}) ;
assertNull(ref.get()) ;
@@ -286,7 +287,7 @@ public class TestTransactionLifecycle2 {
L.syncOtherThread(()->{
// Should suceed
- Transaction txn2 = txnMgr.begin(ReadWrite.WRITE, false) ;
+ Transaction txn2 = txnMgr.begin(TxnType.WRITE, false) ;
ref.set(txn2);
txn2.abort() ;
txn2.end() ;
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnSwitching.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnSwitching.java b/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnSwitching.java
index 96f1e9c..d32fd55 100644
--- a/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnSwitching.java
+++ b/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTxnSwitching.java
@@ -23,12 +23,12 @@ import static org.junit.Assert.fail ;
import org.apache.jena.dboe.base.file.Location;
import org.apache.jena.dboe.jenax.Txn;
-import org.apache.jena.system.ThreadTxn;
-import org.apache.jena.system.ThreadAction;
-import org.apache.jena.dboe.transaction.TransInteger;
import org.apache.jena.dboe.transaction.txn.*;
import org.apache.jena.dboe.transaction.txn.journal.Journal;
import org.apache.jena.query.ReadWrite ;
+import org.apache.jena.query.TxnType;
+import org.apache.jena.system.ThreadAction;
+import org.apache.jena.system.ThreadTxn;
import org.junit.After ;
import org.junit.Before ;
import org.junit.Test ;
@@ -56,7 +56,7 @@ public class TestTxnSwitching {
@Test public void txnSwitch_01() {
long z = integer.value() ;
- transactional.begin(ReadWrite.WRITE);
+ transactional.begin(TxnType.WRITE);
integer.inc();
assertEquals(integer.value()+1, integer.get()) ;
@@ -113,12 +113,12 @@ public class TestTxnSwitching {
Txn.executeWrite(transactional, ()->integer.inc());
assertEquals(z+1, integer.value()) ;
- Transaction txn = txnMgr.begin(ReadWrite.WRITE) ;
+ Transaction txn = txnMgr.begin(TxnType.WRITE) ;
integer.inc();
assertEquals(z+2, integer.get()) ;
TransactionCoordinatorState txnState = txnMgr.detach(txn) ;
- Transaction txnRead = txnMgr.begin(ReadWrite.READ) ;
+ Transaction txnRead = txnMgr.begin(TxnType.READ) ;
assertEquals(z+1, integer.get()) ;
txnRead.end() ;
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/DatasetGraphTDB.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/DatasetGraphTDB.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/DatasetGraphTDB.java
index fa2cd47..5a0e7f7 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/DatasetGraphTDB.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/DatasetGraphTDB.java
@@ -38,6 +38,7 @@ import org.apache.jena.graph.GraphUtil ;
import org.apache.jena.graph.Node ;
import org.apache.jena.graph.Triple ;
import org.apache.jena.query.ReadWrite ;
+import org.apache.jena.query.TxnType;
import org.apache.jena.sparql.core.* ;
import org.apache.jena.sparql.engine.optimizer.reorder.ReorderTransformation ;
import org.apache.jena.tdb2.TDBException;
@@ -176,7 +177,6 @@ public class DatasetGraphTDB extends DatasetGraphTriplesQuads
if ( txn.isWriteTxn() )
return ;
- // Transaction.promoteOrException
boolean b = txn.promote() ;
if ( !b )
throw new TransactionException("Can't write") ;
@@ -410,13 +410,28 @@ public class DatasetGraphTDB extends DatasetGraphTriplesQuads
public boolean isInTransaction() {
return txnSystem.isInTransaction() ;
}
+
+ @Override
+ public ReadWrite transactionMode() {
+ return txnSystem.transactionMode();
+ }
+
+ @Override
+ public TxnType transactionType() {
+ return txnSystem.transactionType();
+ }
// txnSystem with monitor?
@Override
+ public void begin(TxnType txnType) {
+ if ( txnMonitor != null ) txnMonitor.startBegin(txnType);
+ txnSystem.begin(txnType);
+ if ( txnMonitor != null ) txnMonitor.finishBegin(txnType);
+ }
+
+ @Override
public void begin(ReadWrite readWrite) {
- if ( txnMonitor != null ) txnMonitor.startBegin(readWrite);
- txnSystem.begin(readWrite) ;
- if ( txnMonitor != null ) txnMonitor.finishBegin(readWrite);
+ begin(TxnType.convert(readWrite));
}
@Override
@@ -424,20 +439,22 @@ public class DatasetGraphTDB extends DatasetGraphTriplesQuads
if ( txnMonitor != null ) txnMonitor.startPromote();
try {
return txnSystem.promote() ;
- } finally { if ( txnMonitor != null ) txnMonitor.finishPromote(); }
+ } finally {
+ if ( txnMonitor != null ) txnMonitor.finishPromote();
+ }
}
@Override
public void commit() {
if ( txnMonitor != null ) txnMonitor.startCommit();
- txnSystem.commit() ;
+ txnSystem.commit();
if ( txnMonitor != null ) txnMonitor.finishCommit();
}
@Override
public void abort() {
if ( txnMonitor != null ) txnMonitor.startAbort() ;
- txnSystem.abort() ;
+ txnSystem.abort();
if ( txnMonitor != null ) txnMonitor.finishAbort() ;
}
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/GraphViewSwitchable.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/GraphViewSwitchable.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/GraphViewSwitchable.java
index c64a218..deeb788 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/GraphViewSwitchable.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/GraphViewSwitchable.java
@@ -20,17 +20,17 @@ package org.apache.jena.tdb2.store;
import java.util.Map ;
-import org.apache.jena.dboe.transaction.txn.TransactionCoordinator;
import org.apache.jena.graph.Graph ;
import org.apache.jena.graph.Node ;
-import org.apache.jena.graph.TransactionHandler;
-import org.apache.jena.query.ReadWrite;
import org.apache.jena.shared.PrefixMapping ;
import org.apache.jena.shared.impl.PrefixMappingImpl ;
-import org.apache.jena.sparql.core.*;
+import org.apache.jena.sparql.core.DatasetPrefixStorage;
+import org.apache.jena.sparql.core.GraphView;
+import org.apache.jena.sparql.core.Quad;
import org.apache.jena.sparql.expr.nodevalue.NodeFunctions;
-/** A GraphView that is sensitive to {@link DatasetGraphSwitchable} switching.
+/**
+ * A GraphView that is sensitive to {@link DatasetGraphSwitchable} switching.
*/
public class GraphViewSwitchable extends GraphView {
// Fixups for GraphView
@@ -49,36 +49,14 @@ public class GraphViewSwitchable extends GraphView {
{ return new GraphViewSwitchable(dsg, Quad.unionGraph) ; }
private final DatasetGraphSwitchable dsgx;
- private final TransactionHandlerDSG transactionHandler;
protected DatasetGraphSwitchable getx() { return dsgx; }
protected GraphViewSwitchable(DatasetGraphSwitchable dsg, Node gn) {
super(dsg, gn) ;
this.dsgx = dsg;
- this.transactionHandler = new TransactionHandlerDSG(dsg);
}
@Override
- public TransactionHandler getTransactionHandler() {
- return transactionHandler;
- }
-
- // Remove when promotion is in the DatasetGraph API.
- static class TransactionHandlerDSG extends TransactionHandlerView {
- public TransactionHandlerDSG(DatasetGraph dsg) {
- super(dsg);
- }
-
- @Override
- public void begin() {
- if ( TransactionCoordinator.promotion )
- getDSG().begin(ReadWrite.READ);
- else
- getDSG().begin(ReadWrite.WRITE);
- }
- }
-
- @Override
protected PrefixMapping createPrefixMapping() {
Node gn = super.getGraphName();
if ( gn == Quad.defaultGraphNodeGenerated )
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/AbstractTestTransPromoteTDB2.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/AbstractTestTransPromoteTDB2.java b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/AbstractTestTransPromoteTDB2.java
deleted file mode 100644
index 34af067..0000000
--- a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/AbstractTestTransPromoteTDB2.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.tdb2.store;
-
-import static org.junit.Assert.fail ;
-
-import org.apache.jena.query.ReadWrite ;
-import org.apache.jena.sparql.JenaTransactionException;
-import org.apache.jena.sparql.core.DatasetGraph ;
-import org.apache.jena.sparql.transaction.AbstractTestTransPromote;
-import org.apache.log4j.Logger ;
-import org.junit.Ignore;
-import org.junit.Test ;
-
-/** Tests for transactions that start read and then promote to write */
-public abstract class AbstractTestTransPromoteTDB2 extends AbstractTestTransPromote {
-
- protected AbstractTestTransPromoteTDB2(Logger[] loggers) {
- super(loggers);
- }
-
- // Copy in Jena 3.4.0 development
- // write-end becomes an exception.
-
- @Override
- @Ignore
- @Test public void promote_snapshot_05() { }
- @Test public void promote_snapshot_05_x() { run_05_X(false) ; }
-
- @Override
- @Ignore
- @Test public void promote_readCommitted_05() { }
-
- @Test public void promote_readCommitted_05_x() { run_05_X(true) ; }
-
- private void run_05_X(boolean readCommitted) {
- //Assume.assumeTrue( ! readCommitted || supportsReadCommitted());
-
- setReadCommitted(readCommitted);
- DatasetGraph dsg = create() ;
- dsg.begin(ReadWrite.READ) ;
- dsg.add(q1) ;
-
- try {
- dsg.end() ;
- fail("begin(W);end() did not throw an exception");
- } catch ( JenaTransactionException ex) {}
-
- assertCount(0, dsg) ;
- }
-}
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/TestTransPromoteTDB.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/TestTransPromoteTDB.java b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/TestTransPromoteTDB.java
index 02afae5..756409a 100644
--- a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/TestTransPromoteTDB.java
+++ b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/TestTransPromoteTDB.java
@@ -19,13 +19,13 @@
package org.apache.jena.tdb2.store;
import org.apache.jena.dboe.transaction.txn.Transaction;
-import org.apache.jena.dboe.transaction.txn.TransactionCoordinator;
import org.apache.jena.dboe.transaction.txn.TransactionException;
import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.sparql.transaction.AbstractTestTransPromote;
import org.apache.jena.tdb2.DatabaseMgr;
import org.apache.log4j.Logger;
-public class TestTransPromoteTDB extends AbstractTestTransPromoteTDB2 {
+public class TestTransPromoteTDB extends AbstractTestTransPromote {
public TestTransPromoteTDB() {
super(getLoggers());
@@ -36,26 +36,6 @@ public class TestTransPromoteTDB extends AbstractTestTransPromoteTDB2 {
}
@Override
- protected void setPromotion(boolean b) {
- TransactionCoordinator.promotion = b ;
- }
-
- @Override
- protected boolean getPromotion() {
- return TransactionCoordinator.promotion ;
- }
-
- @Override
- protected void setReadCommitted(boolean b) {
- TransactionCoordinator.readCommittedPromotion = b ;
- }
-
- @Override
- protected boolean getReadCommitted() {
- return TransactionCoordinator.readCommittedPromotion ;
- }
-
- @Override
protected Class<? extends Exception> getTransactionExceptionClass() {
return TransactionException.class;
}
@@ -64,9 +44,4 @@ public class TestTransPromoteTDB extends AbstractTestTransPromoteTDB2 {
protected DatasetGraph create() {
return DatabaseMgr.createDatasetGraph();
}
-
- @Override
- protected boolean supportsReadCommitted() {
- return true;
- }
}
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
----------------------------------------------------------------------
diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java b/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
index fd09d3f..ad01342 100644
--- a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
+++ b/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
@@ -357,6 +357,8 @@ public class TransactionManager
throw new TDBTransactionException("promote: transaction is not active") ;
if ( txn.getMode() == ReadWrite.WRITE )
return dsgtxn ;
+ if ( txn.getTxnType() == TxnType.READ )
+ throw new TDBTransactionException("promote: transaction is a read transaction") ;
// Read commit - pick up whatever is current at the point setup.
// Can also promote - may need to wait for active writers.
http://git-wip-us.apache.org/repos/asf/jena/blob/d6770eaf/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromoteTDB.java
----------------------------------------------------------------------
diff --git a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromoteTDB.java b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromoteTDB.java
index c844c4b..e71d990 100644
--- a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromoteTDB.java
+++ b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromoteTDB.java
@@ -44,31 +44,6 @@ public class TestTransPromoteTDB extends AbstractTestTransPromote {
}
@Override
- protected boolean supportsReadCommitted() {
- return true ;
- }
-
- // Redundant
-
- @Override
- protected void setPromotion(boolean b) {
- }
-
- @Override
- protected boolean getPromotion() {
- return true;
- }
-
- @Override
- protected void setReadCommitted(boolean b) {
- }
-
- @Override
- protected boolean getReadCommitted() {
- return false;
- }
-
- @Override
protected Class<TDBTransactionException> getTransactionExceptionClass() {
return TDBTransactionException.class ;
}