You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/28 15:26:44 UTC
[37/49] ignite git commit: ignite-1607 Implemented deadlock-free
optimistic serializable tx mode
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 4074eee..c1e9202 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -25,12 +25,10 @@ import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
@@ -64,7 +62,6 @@ import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridFunc;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -78,7 +75,6 @@ import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT;
@@ -131,16 +127,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/** TX handler. */
private IgniteTxHandler txHandler;
- /** All transactions. */
- private final Queue<IgniteInternalTx> committedQ = new ConcurrentLinkedDeque8<>();
-
- /** Preparing transactions. */
- private final Queue<IgniteInternalTx> prepareQ = new ConcurrentLinkedDeque8<>();
-
- /** Minimum start version. */
- private final ConcurrentNavigableMap<GridCacheVersion, AtomicInt> startVerCnts =
- new ConcurrentSkipListMap<>();
-
/** Committed local transactions. */
private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVers =
new GridBoundedConcurrentOrderedMap<>(Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT));
@@ -308,41 +294,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* USE ONLY FOR MEMORY PROFILING DURING TESTS.
*/
@Override public void printMemoryStats() {
- IgniteInternalTx firstTx = committedQ.peek();
-
- int committedSize = committedQ.size();
-
- Map.Entry<GridCacheVersion, AtomicInt> startVerEntry = startVerCnts.firstEntry();
-
- GridCacheVersion minStartVer = null;
- long dur = 0;
-
- if (committedSize > 3000) {
- minStartVer = new GridCacheVersion(Integer.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE, 0);
-
- IgniteInternalTx stuck = null;
-
- for (IgniteInternalTx tx : txs())
- if (tx.startVersion().isLess(minStartVer)) {
- minStartVer = tx.startVersion();
- dur = U.currentTimeMillis() - tx.startTime();
-
- stuck = tx;
- }
-
- X.println("Stuck transaction: " + stuck);
- }
-
X.println(">>> ");
X.println(">>> Transaction manager memory stats [grid=" + cctx.gridName() + ']');
X.println(">>> threadMapSize: " + threadMap.size());
- X.println(">>> idMap [size=" + idMap.size() + ", minStartVer=" + minStartVer + ", dur=" + dur + "ms]");
- X.println(">>> committedQueue [size=" + committedSize +
- ", firstStartVersion=" + (firstTx == null ? "null" : firstTx.startVersion()) +
- ", firstEndVersion=" + (firstTx == null ? "null" : firstTx.endVersion()) + ']');
- X.println(">>> prepareQueueSize: " + prepareQ.size());
- X.println(">>> startVerCntsSize [size=" + startVerCnts.size() +
- ", firstVer=" + startVerEntry + ']');
+ X.println(">>> idMap [size=" + idMap.size() + ']');
X.println(">>> completedVersSize: " + completedVers.size());
}
@@ -361,27 +316,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * @return Committed queue size.
- */
- public int commitQueueSize() {
- return committedQ.size();
- }
-
- /**
- * @return Prepare queue size.
- */
- public int prepareQueueSize() {
- return prepareQ.size();
- }
-
- /**
- * @return Start version counts.
- */
- public int startVersionCountsSize() {
- return startVerCnts.size();
- }
-
- /**
* @return Committed versions size.
*/
public int completedVersionsSize() {
@@ -493,42 +427,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return null;
}
- if (cctx.txConfig().isTxSerializableEnabled()) {
- AtomicInt next = new AtomicInt(1);
-
- boolean loop = true;
-
- while (loop) {
- AtomicInt prev = startVerCnts.putIfAbsent(tx.startVersion(), next);
-
- if (prev == null)
- break; // Put succeeded - exit.
-
- // Previous value was 0, which means that it will be deleted
- // by another thread in "decrementStartVersionCount(..)" method.
- // In that case, we delete here too, so we can safely try again.
- for (;;) {
- int p = prev.get();
-
- assert p >= 0 : p;
-
- if (p == 0) {
- if (startVerCnts.remove(tx.startVersion(), prev))
- if (log.isDebugEnabled())
- log.debug("Removed count from onCreated callback: " + tx);
-
- break; // Retry outer loop.
- }
-
- if (prev.compareAndSet(p, p + 1)) {
- loop = false; // Increment succeeded - exit outer loop.
-
- break;
- }
- }
- }
- }
-
if (tx.timeout() > 0) {
cctx.time().addTimeoutObject(tx);
@@ -822,117 +720,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
}
- boolean txSerEnabled = cctx.txConfig().isTxSerializableEnabled();
-
- // Clean up committed transactions queue.
- if (tx.pessimistic() && tx.local()) {
- if (tx.enforceSerializable() && txSerEnabled) {
- for (Iterator<IgniteInternalTx> it = committedQ.iterator(); it.hasNext();) {
- IgniteInternalTx committedTx = it.next();
-
- assert committedTx != tx;
-
- // Clean up.
- if (isSafeToForget(committedTx))
- it.remove();
- }
- }
-
- // Nothing else to do in pessimistic mode.
- return;
- }
-
- if (txSerEnabled && tx.optimistic() && tx.enforceSerializable()) {
- Set<IgniteTxKey> readSet = tx.readSet();
- Set<IgniteTxKey> writeSet = tx.writeSet();
-
- GridCacheVersion startTn = tx.startVersion();
-
- GridCacheVersion finishTn = cctx.versions().last();
-
- // Add future to prepare queue only on first prepare call.
- if (tx.markPreparing())
- prepareQ.offer(tx);
-
- // Check that our read set does not intersect with write set
- // of all transactions that completed their write phase
- // while our transaction was in read phase.
- for (Iterator<IgniteInternalTx> it = committedQ.iterator(); it.hasNext();) {
- IgniteInternalTx committedTx = it.next();
-
- assert committedTx != tx;
-
- // Clean up.
- if (isSafeToForget(committedTx)) {
- it.remove();
-
- continue;
- }
-
- GridCacheVersion tn = committedTx.endVersion();
-
- // We only care about transactions
- // with tn > startTn and tn <= finishTn
- if (tn.compareTo(startTn) <= 0 || tn.compareTo(finishTn) > 0)
- continue;
-
- if (tx.serializable()) {
- if (GridFunc.intersects(committedTx.writeSet(), readSet)) {
- tx.setRollbackOnly();
-
- throw new IgniteTxOptimisticCheckedException("Failed to prepare transaction " +
- "(committed vs. read-set conflict): " + tx);
- }
- }
- }
-
- // Check that our read and write sets do not intersect with write
- // sets of all active transactions.
- for (Iterator<IgniteInternalTx> iter = prepareQ.iterator(); iter.hasNext();) {
- IgniteInternalTx prepareTx = iter.next();
-
- if (prepareTx == tx)
- // Skip yourself.
- continue;
-
- // Optimistically remove completed transactions.
- if (prepareTx.done()) {
- iter.remove();
-
- if (log.isDebugEnabled())
- log.debug("Removed finished transaction from active queue: " + prepareTx);
-
- continue;
- }
-
- // Check if originating node left.
- if (cctx.discovery().node(prepareTx.nodeId()) == null) {
- iter.remove();
-
- rollbackTx(prepareTx);
-
- if (log.isDebugEnabled())
- log.debug("Removed and rolled back transaction because sender node left grid: " +
- CU.txString(prepareTx));
-
- continue;
- }
-
- if (tx.serializable() && !prepareTx.isRollbackOnly()) {
- Set<IgniteTxKey> prepareWriteSet = prepareTx.writeSet();
-
- if (GridFunc.intersects(prepareWriteSet, readSet, writeSet)) {
- // Remove from active set.
- iter.remove();
-
- tx.setRollbackOnly();
-
- throw new IgniteTxOptimisticCheckedException(
- "Failed to prepare transaction (read-set/write-set conflict): " + tx);
- }
- }
- }
- }
+ if (tx.pessimistic() && tx.local())
+ return; // Nothing else to do in pessimistic mode.
// Optimistic.
assert tx.optimistic() || !tx.local();
@@ -945,40 +734,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param tx Transaction to check.
- * @return {@code True} if transaction can be discarded.
- */
- private boolean isSafeToForget(IgniteInternalTx tx) {
- Map.Entry<GridCacheVersion, AtomicInt> e = startVerCnts.firstEntry();
-
- if (e == null)
- return true;
-
- assert e.getValue().get() >= 0;
-
- return tx.endVersion().compareTo(e.getKey()) <= 0;
- }
-
- /**
- * Decrement start version count.
- *
- * @param tx Cache transaction.
- */
- private void decrementStartVersionCount(IgniteInternalTx tx) {
- AtomicInt cnt = startVerCnts.get(tx.startVersion());
-
- assert cnt != null : "Failed to find start version count for transaction [startVerCnts=" + startVerCnts +
- ", tx=" + tx + ']';
-
- assert cnt.get() > 0;
-
- if (cnt.decrementAndGet() == 0)
- if (startVerCnts.remove(tx.startVersion(), cnt))
- if (log.isDebugEnabled())
- log.debug("Removed start version for transaction: " + tx);
- }
-
- /**
* @param tx Transaction.
*/
private void removeObsolete(IgniteInternalTx tx) {
@@ -1237,6 +992,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param tx Transaction.
+ * @return {@code True} if transaction read entries should be unlocked.
+ */
+ private boolean unlockReadEntries(IgniteInternalTx tx) {
+ if (tx.pessimistic())
+ return !tx.readCommitted();
+ else
+ return tx.serializable();
+ }
+
+ /**
* Commits a transaction.
*
* @param tx Transaction to commit.
@@ -1290,8 +1056,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// 4. Unlock write resources.
unlockMultiple(tx, tx.writeEntries());
- // 5. For pessimistic transaction, unlock read resources if required.
- if (tx.pessimistic() && !tx.readCommitted())
+ // 5. Unlock read resources if required.
+ if (unlockReadEntries(tx))
unlockMultiple(tx, tx.readEntries());
// 6. Notify evictions.
@@ -1303,25 +1069,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// 8. Assign transaction number at the end of transaction.
tx.endVersion(cctx.versions().next(tx.topologyVersion()));
- // 9. Clean start transaction number for this transaction.
- if (cctx.txConfig().isTxSerializableEnabled())
- decrementStartVersionCount(tx);
-
- // 10. Add to committed queue only if it is possible
- // that this transaction can affect other ones.
- if (cctx.txConfig().isTxSerializableEnabled() && tx.enforceSerializable() && !isSafeToForget(tx))
- committedQ.add(tx);
-
- // 11. Remove from per-thread storage.
+ // 9. Remove from per-thread storage.
clearThreadMap(tx);
- // 12. Unregister explicit locks.
+ // 10. Unregister explicit locks.
if (!tx.alternateVersions().isEmpty()) {
for (GridCacheVersion ver : tx.alternateVersions())
idMap.remove(ver);
}
- // 13. Remove Near-2-DHT mappings.
+ // 11. Remove Near-2-DHT mappings.
if (tx instanceof GridCacheMappedVersion) {
GridCacheVersion mapped = ((GridCacheMappedVersion)tx).mappedVersion();
@@ -1329,10 +1086,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
mappedVers.remove(mapped);
}
- // 14. Clear context.
+ // 12. Clear context.
resetContext();
- // 15. Update metrics.
+ // 14. Update metrics.
if (!tx.dht() && tx.local()) {
if (!tx.system())
cctx.txMetrics().onTxCommit();
@@ -1378,8 +1135,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// 2. Unlock write resources.
unlockMultiple(tx, tx.writeEntries());
- // 3. For pessimistic transaction, unlock read resources if required.
- if (tx.pessimistic() && !tx.readCommitted())
+ // 3. Unlock read resources if required.
+ if (unlockReadEntries(tx))
unlockMultiple(tx, tx.readEntries());
// 4. Notify evictions.
@@ -1388,26 +1145,22 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// 5. Remove obsolete entries.
removeObsolete(tx);
- // 6. Clean start transaction number for this transaction.
- if (cctx.txConfig().isTxSerializableEnabled())
- decrementStartVersionCount(tx);
-
- // 7. Remove from per-thread storage.
+ // 6. Remove from per-thread storage.
clearThreadMap(tx);
- // 8. Unregister explicit locks.
+ // 7. Unregister explicit locks.
if (!tx.alternateVersions().isEmpty())
for (GridCacheVersion ver : tx.alternateVersions())
idMap.remove(ver);
- // 9. Remove Near-2-DHT mappings.
+ // 8. Remove Near-2-DHT mappings.
if (tx instanceof GridCacheMappedVersion)
mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion());
- // 10. Clear context.
+ // 9. Clear context.
resetContext();
- // 11. Update metrics.
+ // 10. Update metrics.
if (!tx.dht() && tx.local()) {
if (!tx.system())
cctx.txMetrics().onTxRollback();
@@ -1445,30 +1198,26 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// 1. Unlock write resources.
unlockMultiple(tx, tx.writeEntries());
- // 2. For pessimistic transaction, unlock read resources if required.
- if (tx.pessimistic() && !tx.readCommitted())
+ // 2. Unlock read resources if required.
+ if (unlockReadEntries(tx))
unlockMultiple(tx, tx.readEntries());
// 3. Notify evictions.
notifyEvitions(tx);
- // 4. Clean start transaction number for this transaction.
- if (cctx.txConfig().isTxSerializableEnabled())
- decrementStartVersionCount(tx);
-
- // 5. Remove from per-thread storage.
+ // 4. Remove from per-thread storage.
clearThreadMap(tx);
- // 6. Unregister explicit locks.
+ // 5. Unregister explicit locks.
if (!tx.alternateVersions().isEmpty())
for (GridCacheVersion ver : tx.alternateVersions())
idMap.remove(ver);
- // 7. Remove Near-2-DHT mappings.
+ // 6. Remove Near-2-DHT mappings.
if (tx instanceof GridCacheMappedVersion)
mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion());
- // 8. Clear context.
+ // 7. Clear context.
resetContext();
if (log.isDebugEnabled())
@@ -1635,6 +1384,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// we wait for the lock.
long timeout = tx.timeout() == 0 ? 0 : remainingTime;
+ GridCacheVersion serOrder = (tx.serializable() && tx.optimistic()) ? tx.nearXidVersion() : null;
+
for (IgniteTxEntry txEntry1 : entries) {
// Check if this entry was prepared before.
if (!txEntry1.markPrepared() || txEntry1.explicitVersion() != null)
@@ -1649,7 +1400,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert !entry1.detached() : "Expected non-detached entry for near transaction " +
"[locNodeId=" + cctx.localNodeId() + ", entry=" + entry1 + ']';
- if (!entry1.tmLock(tx, timeout)) {
+ GridCacheVersion serReadVer = txEntry1.serializableReadVersion();
+
+ assert serReadVer == null || (tx.optimistic() && tx.serializable()) : txEntry1;
+
+ if (!entry1.tmLock(tx, timeout, serOrder, serReadVer)) {
// Unlock locks locked so far.
for (IgniteTxEntry txEntry2 : entries) {
if (txEntry2 == txEntry1)
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 36f1c36..68d03cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -53,9 +53,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
/** Last version. */
private volatile GridCacheVersion last;
- /** Serializable transaction flag. */
- private boolean txSerEnabled;
-
/** Data center ID. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private byte dataCenterId;
@@ -64,6 +61,9 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
private long gridStartTime;
/** */
+ private GridCacheVersion ISOLATED_STREAMER_VER;
+
+ /** */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
assert evt.type() == EVT_NODE_METRICS_UPDATED;
@@ -79,8 +79,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
/** {@inheritDoc} */
@Override public void start0() throws IgniteCheckedException {
- txSerEnabled = cctx.gridConfig().getTransactionConfiguration().isTxSerializableEnabled();
-
last = new GridCacheVersion(0, 0, order.get(), 0, dataCenterId);
cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_METRICS_UPDATED);
@@ -154,6 +152,27 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
}
/**
+ * Version for entries loaded with isolated streamer, should be less than any version generated
+ * for entries update.
+ *
+ * @return Version for entries loaded with isolated streamer.
+ */
+ public GridCacheVersion isolatedStreamerVersion() {
+ if (ISOLATED_STREAMER_VER == null) {
+ long topVer = 1;
+
+ if (gridStartTime == 0)
+ gridStartTime = cctx.kernalContext().discovery().gridStartTime();
+
+ topVer += (gridStartTime - TOP_VER_BASE_TIME) / 1000;
+
+ ISOLATED_STREAMER_VER = new GridCacheVersion((int)topVer, 0, 0, 1, dataCenterId);
+ }
+
+ return ISOLATED_STREAMER_VER;
+ }
+
+ /**
* @return Next version based on current topology.
*/
public GridCacheVersion next() {
@@ -235,36 +254,18 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
int locNodeOrder = (int)cctx.localNode().order();
- if (txSerEnabled) {
- synchronized (this) {
- long ord = forLoad ? loadOrder.incrementAndGet() : order.incrementAndGet();
-
- GridCacheVersion next = new GridCacheVersion(
- (int)topVer,
- globalTime,
- ord,
- locNodeOrder,
- dataCenterId);
-
- last = next;
+ long ord = forLoad ? loadOrder.incrementAndGet() : order.incrementAndGet();
- return next;
- }
- }
- else {
- long ord = forLoad ? loadOrder.incrementAndGet() : order.incrementAndGet();
-
- GridCacheVersion next = new GridCacheVersion(
- (int)topVer,
- globalTime,
- ord,
- locNodeOrder,
- dataCenterId);
+ GridCacheVersion next = new GridCacheVersion(
+ (int)topVer,
+ globalTime,
+ ord,
+ locNodeOrder,
+ dataCenterId);
- last = next;
+ last = next;
- return next;
- }
+ return next;
}
/**
@@ -273,12 +274,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Last generated version.
*/
public GridCacheVersion last() {
- if (txSerEnabled) {
- synchronized (this) {
- return last;
- }
- }
- else
- return last;
+ return last;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index ab2a6e8..2190bf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1556,7 +1556,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
- GridCacheVersion ver = cctx.versions().next(topVer);
+ GridCacheVersion ver = cctx.versions().isolatedStreamerVersion();
long ttl = CU.TTL_ETERNAL;
long expiryTime = CU.EXPIRE_TIME_ETERNAL;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
index a6e960d..6c4e894 100644
--- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
+++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
@@ -54,7 +54,7 @@ import org.apache.ignite.lang.IgniteUuid;
* Read access with this level happens the same way as with {@link TransactionIsolation#REPEATABLE_READ} level.
* However, in {@link TransactionConcurrency#OPTIMISTIC} mode, if some transactions cannot be serially isolated
* from each other, then one winner will be picked and the other transactions in conflict will result in
- * {@link org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException} being thrown.
+ * {@link TransactionOptimisticException} being thrown.
* </li>
* </ul>
* <p>
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java
index d7671f0..c3be3c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java
+++ b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java
@@ -42,8 +42,7 @@ public enum TransactionIsolation {
* @param ord Ordinal value.
* @return Enumerated value or {@code null} if ordinal out of range.
*/
- @Nullable
- public static TransactionIsolation fromOrdinal(int ord) {
+ @Nullable public static TransactionIsolation fromOrdinal(int ord) {
return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java
new file mode 100644
index 0000000..c2f9fab
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionOptimisticException;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheNearReaderUpdateTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** */
+ private static final int SRVS = 4;
+
+ /** */
+ private static final int CLIENTS = 3;
+
+ /** */
+ private static Map<Integer, Integer> storeMap = new ConcurrentHashMap8<>();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(SRVS);
+
+ client = true;
+
+ startGridsMultiThreaded(SRVS, CLIENTS);
+
+ client = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 10 * 60_000;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetUpdateMultithreaded() throws Exception {
+ List<CacheConfiguration<Integer, Integer>> cfgs = new ArrayList<>();
+
+ cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false));
+ cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
+ cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true));
+ cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false));
+
+ {
+ CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false);
+
+ GridTestUtils.setMemoryMode(null, ccfg, GridTestUtils.TestMemoryMode.OFFHEAP_TIERED, 0, 0);
+
+ cfgs.add(ccfg);
+ }
+
+ final List<Ignite> putNodes = new ArrayList<>();
+
+ for (int i = 0; i < SRVS + CLIENTS - 1; i++)
+ putNodes.add(ignite(i));
+
+ final List<Ignite> getNodes = new ArrayList<>();
+
+ getNodes.add(ignite(SRVS + CLIENTS - 1));
+ getNodes.add(ignite(0));
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cfgs) {
+ logCacheInfo(ccfg);
+
+ getUpdateMultithreaded(ccfg, putNodes, getNodes, null, null);
+
+ if (ccfg.getAtomicityMode() == TRANSACTIONAL) {
+ getUpdateMultithreaded(ccfg, putNodes, getNodes, PESSIMISTIC, REPEATABLE_READ);
+
+ getUpdateMultithreaded(ccfg, putNodes, getNodes, OPTIMISTIC, REPEATABLE_READ);
+
+ getUpdateMultithreaded(ccfg, putNodes, getNodes, OPTIMISTIC, SERIALIZABLE);
+ }
+ }
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @param putNodes Nodes executing updates.
+ * @param getNodes Nodes executing gets.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @throws Exception If failed.
+ */
+ private void getUpdateMultithreaded(CacheConfiguration<Integer, Integer> ccfg,
+ final List<Ignite> putNodes,
+ final List<Ignite> getNodes,
+ final TransactionConcurrency concurrency,
+ final TransactionIsolation isolation) throws Exception {
+ log.info("Execute updates [concurrency=" + concurrency + ", isolation=" + isolation + ']');
+
+ final Ignite ignite0 = ignite(0);
+
+ final String cacheName = ignite0.createCache(ccfg).getName();
+
+ try {
+ for (int i = 0; i < 5; i++) {
+ final Integer key = i;
+
+ final AtomicInteger putThreadIdx = new AtomicInteger();
+ final AtomicInteger getThreadIdx = new AtomicInteger();
+
+ final int PUT_THREADS = 20;
+ final int GET_THREAD = 20;
+
+ final CyclicBarrier barrier = new CyclicBarrier(PUT_THREADS + GET_THREAD);
+
+ final IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int idx = putThreadIdx.getAndIncrement() % putNodes.size();
+
+ Ignite ignite = putNodes.get(idx);
+
+ IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
+
+ IgniteTransactions txs = ignite.transactions();
+
+ Thread.currentThread().setName("update-thread-" + ignite.name());
+
+ barrier.await();
+
+ for (int i = 0; i < 100; i++) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ if (concurrency != null) {
+ try (Transaction tx = txs.txStart(concurrency, isolation)) {
+ cache.put(key, rnd.nextInt());
+
+ tx.commit();
+ }
+ catch (TransactionOptimisticException ignore) {
+ assertEquals(concurrency, OPTIMISTIC);
+ assertEquals(isolation, SERIALIZABLE);
+ }
+ }
+ else
+ cache.put(key, rnd.nextInt());
+ }
+
+ return null;
+ }
+ }, PUT_THREADS, "update-thread");
+
+ IgniteInternalFuture<?> getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int idx = getThreadIdx.getAndIncrement() % getNodes.size();
+
+ Ignite ignite = getNodes.get(idx);
+
+ IgniteCache<Integer, Integer> cache;
+
+ if (ignite.configuration().isClientMode())
+ cache = ignite.createNearCache(cacheName, new NearCacheConfiguration<Integer, Integer>());
+ else
+ cache = ignite.cache(cacheName);
+
+ Thread.currentThread().setName("get-thread-" + ignite.name());
+
+ barrier.await();
+
+ while (!updateFut.isDone())
+ cache.get(key);
+
+ return null;
+ }
+ }, GET_THREAD, "get-thread");
+
+ updateFut.get();
+ getFut.get();
+
+ Integer val = (Integer)ignite0.cache(cacheName).get(key);
+
+ log.info("Iteration [iter=" + i + ", val=" + val + ']');
+
+ for (Ignite getNode : getNodes) {
+ IgniteCache<Integer, Integer> cache = getNode.cache(cacheName);
+
+ if (getNode.configuration().isClientMode() ||
+ cache.getConfiguration(CacheConfiguration.class).getNearConfiguration() != null)
+ assertNotNull(getNode.cache(cacheName).localPeek(key));
+ }
+
+ checkValue(key, val, cacheName);
+
+ for (int n = 0; n < SRVS + CLIENTS; n++) {
+ val = n;
+
+ ignite(n).cache(cacheName).put(key, val);
+
+ checkValue(key, val, cacheName);
+ }
+ }
+ }
+ finally {
+ destroyCache(ignite0, cacheName);
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @param expVal Expected value.
+ * @param cacheName Cache name.
+ */
+ private void checkValue(Object key, Object expVal, String cacheName) {
+ for (int i = 0; i < SRVS + CLIENTS; i++) {
+ IgniteCache<Object, Object> cache = ignite(i).cache(cacheName);
+
+ assertEquals(expVal, cache.get(key));
+ }
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ */
+ private void logCacheInfo(CacheConfiguration<?, ?> ccfg) {
+ log.info("Test cache [mode=" + ccfg.getCacheMode() +
+ ", sync=" + ccfg.getWriteSynchronizationMode() +
+ ", backups=" + ccfg.getBackups() +
+ ", memMode=" + ccfg.getMemoryMode() +
+ ", near=" + (ccfg.getNearConfiguration() != null) +
+ ", store=" + ccfg.isWriteThrough() +
+ ", evictPlc=" + (ccfg.getEvictionPolicy() != null) +
+ ", swap=" + ccfg.isSwapEnabled() +
+ ", maxOffheap=" + ccfg.getOffHeapMaxMemory() +
+ ']');
+ }
+
+ /**
+ * @param ignite Node.
+ * @param cacheName Cache name.
+ */
+ private void destroyCache(Ignite ignite, String cacheName) {
+ storeMap.clear();
+
+ ignite.destroyCache(cacheName);
+
+ for (Ignite ignite0 : G.allGrids()) {
+ GridTestSwapSpaceSpi spi = (GridTestSwapSpaceSpi)ignite0.configuration().getSwapSpaceSpi();
+
+ spi.clearAll();
+ }
+ }
+
+ /**
+ * @param cacheMode Cache mode.
+ * @param syncMode Write synchronization mode.
+ * @param backups Number of backups.
+ * @param storeEnabled If {@code true} adds cache store.
+ * @param nearCache If {@code true} near cache is enabled.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(
+ CacheMode cacheMode,
+ CacheWriteSynchronizationMode syncMode,
+ int backups,
+ boolean storeEnabled,
+ boolean nearCache) {
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(syncMode);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ if (storeEnabled) {
+ ccfg.setCacheStoreFactory(new TestStoreFactory());
+ ccfg.setWriteThrough(true);
+ ccfg.setReadThrough(true);
+ }
+
+ if (nearCache)
+ ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>());
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ private static class TestStoreFactory implements Factory<CacheStore<Integer, Integer>> {
+ /** {@inheritDoc} */
+ @Override public CacheStore<Integer, Integer> create() {
+ return new CacheStoreAdapter<Integer, Integer>() {
+ @Override public Integer load(Integer key) throws CacheLoaderException {
+ return storeMap.get(key);
+ }
+
+ @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) {
+ storeMap.put(entry.getKey(), entry.getValue());
+ }
+
+ @Override public void delete(Object key) {
+ storeMap.remove(key);
+ }
+ };
+ }
+ }
+}