You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/10 08:15:44 UTC
[27/34] ignite git commit: Performance optimizations.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 1c82636..eb2ca2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -24,16 +24,14 @@ import java.io.ObjectOutput;
import java.io.ObjectStreamException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
@@ -47,7 +45,6 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -59,7 +56,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.lang.GridTuple;
@@ -199,20 +195,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
protected boolean transform;
/** Commit version. */
- private AtomicReference<GridCacheVersion> commitVer = new AtomicReference<>(null);
-
- /** Done marker. */
- protected final AtomicBoolean isDone = new AtomicBoolean(false);
+ private volatile GridCacheVersion commitVer;
/** */
private AtomicReference<FinalizationStatus> finalizing = new AtomicReference<>(FinalizationStatus.NONE);
- /** Preparing flag. */
- private AtomicBoolean preparing = new AtomicBoolean();
+ /** Done marker. */
+ protected volatile boolean isDone;
+
+ /** Preparing flag (no need for volatile modifier). */
+ private boolean preparing;
/** */
@GridToStringInclude
- private Map<Integer, Set<Integer>> invalidParts = new HashMap<>(3);
+ private Map<Integer, Set<Integer>> invalidParts;
/**
* Transaction state. Note that state is not protected, as we want to
@@ -230,17 +226,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** */
@GridToStringExclude
- private AtomicReference<GridFutureAdapter<IgniteInternalTx>> finFut = new AtomicReference<>();
+ private volatile GridFutureAdapter<IgniteInternalTx> finFut;
/** Topology version. */
@GridToStringInclude
- protected AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE);
-
- /** Mutex. */
- private final Lock lock = new ReentrantLock();
-
- /** Lock condition. */
- private final Condition cond = lock.newCondition();
+ protected volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
/** */
protected Map<UUID, Collection<UUID>> txNodes;
@@ -387,37 +377,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/**
- * Acquires lock.
- */
- @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
- protected final void lock() {
- lock.lock();
- }
-
- /**
- * Releases lock.
- */
- protected final void unlock() {
- lock.unlock();
- }
-
- /**
- * Signals all waiters.
- */
- protected final void signalAll() {
- cond.signalAll();
- }
-
- /**
- * Waits for signal.
- *
- * @throws InterruptedException If interrupted.
- */
- protected final void awaitSignal() throws InterruptedException {
- cond.await();
- }
-
- /**
* Checks whether near cache should be updated.
*
* @return Flag indicating whether near cache should be updated.
@@ -548,7 +507,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion() {
- AffinityTopologyVersion res = topVer.get();
+ AffinityTopologyVersion res = topVer;
if (res.equals(AffinityTopologyVersion.NONE))
return cctx.exchange().topologyVersion();
@@ -558,16 +517,29 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersionSnapshot() {
- AffinityTopologyVersion ret = topVer.get();
+ AffinityTopologyVersion ret = topVer;
return AffinityTopologyVersion.NONE.equals(ret) ? null : ret;
}
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) {
- this.topVer.compareAndSet(AffinityTopologyVersion.NONE, topVer);
+ AffinityTopologyVersion topVer0 = this.topVer;
- return this.topVer.get();
+ if (!AffinityTopologyVersion.NONE.equals(topVer0))
+ return topVer0;
+
+ synchronized (this) {
+ topVer0 = this.topVer;
+
+ if (AffinityTopologyVersion.NONE.equals(topVer0)) {
+ this.topVer = topVer;
+
+ return topVer;
+ }
+
+ return topVer0;
+ }
}
/** {@inheritDoc} */
@@ -582,7 +554,14 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public boolean markPreparing() {
- return preparing.compareAndSet(false, true);
+ synchronized (this) {
+ if (preparing)
+ return false;
+
+ preparing = true;
+
+ return true;
+ }
}
/**
@@ -730,15 +709,18 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public Map<Integer, Set<Integer>> invalidPartitions() {
- return invalidParts;
+ return invalidParts == null ? Collections.<Integer, Set<Integer>>emptyMap() : invalidParts;
}
/** {@inheritDoc} */
@Override public void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int part) {
+ if (invalidParts == null)
+ invalidParts = new HashMap<>();
+
Set<Integer> parts = invalidParts.get(cacheCtx.cacheId());
if (parts == null) {
- parts = new GridLeanSet<>();
+ parts = new HashSet<>();
invalidParts.put(cacheCtx.cacheId(), parts);
}
@@ -879,32 +861,71 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public boolean done() {
- return isDone.get();
+ return isDone;
}
/**
- * @return Commit version.
+ * @return {@code True} if done flag has been set by this call.
*/
- @Override public GridCacheVersion commitVersion() {
- initCommitVersion();
+ private boolean setDone() {
+ boolean isDone0 = isDone;
+
+ if (isDone0)
+ return false;
+
+ synchronized (this) {
+ isDone0 = isDone;
- return commitVer.get();
+ if (isDone0)
+ return false;
+
+ isDone = true;
+
+ return true;
+ }
}
/**
- * @param commitVer Commit version.
- * @return {@code True} if set to not null value.
+ * @return Commit version.
*/
- @Override public boolean commitVersion(GridCacheVersion commitVer) {
- return commitVer != null && this.commitVer.compareAndSet(null, commitVer);
+ @Override public GridCacheVersion commitVersion() {
+ GridCacheVersion commitVer0 = commitVer;
+
+ if (commitVer0 != null)
+ return commitVer0;
+
+ synchronized (this) {
+ commitVer0 = commitVer;
+
+ if (commitVer0 != null)
+ return commitVer0;
+
+ commitVer = commitVer0 = xidVer;
+
+ return commitVer0;
+ }
}
/**
- *
+ * @param commitVer Commit version.
*/
- public void initCommitVersion() {
- if (commitVer.get() == null)
- commitVer.compareAndSet(null, xidVer);
+ @Override public void commitVersion(GridCacheVersion commitVer) {
+ if (commitVer == null)
+ return;
+
+ GridCacheVersion commitVer0 = this.commitVer;
+
+ if (commitVer0 != null)
+ return;
+
+ synchronized (this) {
+ commitVer0 = this.commitVer;
+
+ if (commitVer0 != null)
+ return;
+
+ this.commitVer = commitVer;
+ }
}
/**
@@ -916,7 +937,19 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED)
rollback();
- awaitCompletion();
+ synchronized (this) {
+ try {
+ while (!done())
+ wait();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ if (!done())
+ throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " +
+ this, e);
+ }
+ }
}
/** {@inheritDoc} */
@@ -930,29 +963,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/* No-op. */
}
- /**
- * Awaits transaction completion.
- *
- * @throws IgniteCheckedException If waiting failed.
- */
- protected void awaitCompletion() throws IgniteCheckedException {
- lock();
-
- try {
- while (!done())
- awaitSignal();
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- if (!done())
- throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " + this, e);
- }
- finally {
- unlock();
- }
- }
-
/** {@inheritDoc} */
@Override public boolean internal() {
return internal;
@@ -1019,22 +1029,27 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
@Override public IgniteInternalFuture<IgniteInternalTx> finishFuture() {
- GridFutureAdapter<IgniteInternalTx> fut = finFut.get();
+ GridFutureAdapter<IgniteInternalTx> fut = finFut;
if (fut == null) {
- fut = new GridFutureAdapter<IgniteInternalTx>() {
- @Override public String toString() {
- return S.toString(GridFutureAdapter.class, this, "tx", IgniteTxAdapter.this);
- }
- };
+ synchronized (this) {
+ fut = finFut;
- if (!finFut.compareAndSet(null, fut))
- fut = finFut.get();
+ if (fut == null) {
+ fut = new GridFutureAdapter<IgniteInternalTx>() {
+ @Override public String toString() {
+ return S.toString(GridFutureAdapter.class, this, "tx", IgniteTxAdapter.this);
+ }
+ };
+
+ finFut = fut;
+ }
+ }
}
assert fut != null;
- if (isDone.get())
+ if (isDone)
fut.onDone(this);
return fut;
@@ -1059,9 +1074,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
boolean notify = false;
- lock();
-
- try {
+ synchronized (this) {
prev = this.state;
switch (state) {
@@ -1087,7 +1100,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
case UNKNOWN: {
- if (isDone.compareAndSet(false, true))
+ if (setDone())
notify = true;
valid = prev == ROLLING_BACK || prev == COMMITTING;
@@ -1096,7 +1109,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
case COMMITTED: {
- if (isDone.compareAndSet(false, true))
+ if (setDone())
notify = true;
valid = prev == COMMITTING;
@@ -1105,7 +1118,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
case ROLLED_BACK: {
- if (isDone.compareAndSet(false, true))
+ if (setDone())
notify = true;
valid = prev == ROLLING_BACK;
@@ -1135,8 +1148,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
if (log.isDebugEnabled())
log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']');
- // Notify of state change.
- signalAll();
+ notifyAll();
}
else {
if (log.isDebugEnabled())
@@ -1144,12 +1156,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
", tx=" + this + ']');
}
}
- finally {
- unlock();
- }
if (notify) {
- GridFutureAdapter<IgniteInternalTx> fut = finFut.get();
+ GridFutureAdapter<IgniteInternalTx> fut = finFut;
if (fut != null)
fut.onDone(this);
@@ -2026,8 +2035,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Override public boolean commitVersion(GridCacheVersion commitVer) {
- return false;
+ @Override public void commitVersion(GridCacheVersion commitVer) {
+ // No-op.
}
/** {@inheritDoc} */
@@ -2037,7 +2046,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public void prepare() throws IgniteCheckedException {
-
+ // No-op.
}
/** {@inheritDoc} */
@@ -2047,7 +2056,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public void endVersion(GridCacheVersion endVer) {
-
+ // No-op.
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index d9786a8..570aa48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -876,7 +876,7 @@ public class IgniteTxHandler {
log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']');
if (req.checkCommitted()) {
- sendReply(nodeId, req, checkDhtRemoteTxCommitted(req.version()));
+ sendReply(nodeId, req, !ctx.tm().addRolledbackTx(null, req.version()));
return;
}
@@ -896,8 +896,11 @@ public class IgniteTxHandler {
if (req.replyRequired()) {
IgniteInternalFuture completeFut;
- IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ? null : dhtTx.done() ? null : dhtTx.finishFuture();
- IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ? null : nearTx.done() ? null : nearTx.finishFuture();
+ IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ?
+ null : dhtTx.done() ? null : dhtTx.finishFuture();
+
+ IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ?
+ null : nearTx.done() ? null : nearTx.finishFuture();
if (dhtFin != null && nearFin != null) {
GridCompoundFuture fut = new GridCompoundFuture();
@@ -914,8 +917,7 @@ public class IgniteTxHandler {
if (completeFut != null) {
completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override
- public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) {
+ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) {
sendReply(nodeId, req, true);
}
});
@@ -928,24 +930,6 @@ public class IgniteTxHandler {
}
/**
- * Checks whether DHT remote transaction with given version has been committed. If not, will add version
- * to rollback version set so that late response will not falsely commit this transaction.
- *
- * @param writeVer Write version to check.
- * @return {@code True} if transaction has been committed, {@code false} otherwise.
- */
- public boolean checkDhtRemoteTxCommitted(GridCacheVersion writeVer) {
- assert writeVer != null;
-
- boolean committed = true;
-
- if (ctx.tm().addRolledbackTx(writeVer))
- committed = false;
-
- return committed;
- }
-
- /**
* @param nodeId Node ID.
* @param tx Transaction.
* @param req Request.
@@ -953,7 +937,8 @@ public class IgniteTxHandler {
protected void finish(
UUID nodeId,
IgniteTxRemoteEx tx,
- GridDhtTxFinishRequest req) {
+ GridDhtTxFinishRequest req
+ ) {
// We don't allow explicit locks for transactions and
// therefore immediately return if transaction is null.
// However, we may decide to relax this restriction in
@@ -961,9 +946,9 @@ public class IgniteTxHandler {
if (tx == null) {
if (req.commit())
// Must be some long time duplicate, but we add it anyway.
- ctx.tm().addCommittedTx(req.version(), null);
+ ctx.tm().addCommittedTx(tx, req.version(), null);
else
- ctx.tm().addRolledbackTx(req.version());
+ ctx.tm().addRolledbackTx(tx, req.version());
if (log.isDebugEnabled())
log.debug("Received finish request for non-existing transaction (added to completed set) " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 82e5f2a..2c7bf8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -206,21 +206,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
int taskNameHash
) {
super(
- cctx,
- xidVer,
- implicit,
- implicitSingle,
- /*local*/true,
- sys,
+ cctx,
+ xidVer,
+ implicit,
+ implicitSingle,
+ /*local*/true,
+ sys,
plc,
- concurrency,
- isolation,
+ concurrency,
+ isolation,
timeout,
invalidate,
- storeEnabled,
- onePhaseCommit,
- txSize,
- subjId,
+ storeEnabled,
+ onePhaseCommit,
+ txSize,
+ subjId,
taskNameHash
);
@@ -1054,7 +1054,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
eventNodeId(),
txEntry.nodeId(),
false,
- false,
evt,
metrics,
topVer,
@@ -1072,7 +1071,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
nodeId,
false,
false,
- false,
metrics,
topVer,
CU.empty0(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 1f51b8a..c2e7dea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -18,17 +18,16 @@
package org.apache.ignite.internal.processors.cache.transactions;
import java.io.Externalizable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
-import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
@@ -62,6 +61,7 @@ import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -75,6 +75,7 @@ import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedHashMap;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT;
@@ -95,6 +96,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
/**
* Cache transaction manager.
@@ -128,8 +130,18 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
private IgniteTxHandler txHandler;
/** Committed local transactions. */
- private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVers =
- new GridBoundedConcurrentOrderedMap<>(Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT));
+ private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVersSorted =
+ new GridBoundedConcurrentOrderedMap<>(
+ Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT));
+
+ /** Committed local transactions. */
+ private final ConcurrentLinkedHashMap<GridCacheVersion, Boolean> completedVersHashMap =
+ new ConcurrentLinkedHashMap<>(
+ Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT),
+ 0.75f,
+ Runtime.getRuntime().availableProcessors() * 2,
+ Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT),
+ PER_SEGMENT_Q);
/** Transaction finish synchronizer. */
private GridCacheTxFinishSync txFinishSync;
@@ -298,7 +310,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
X.println(">>> Transaction manager memory stats [grid=" + cctx.gridName() + ']');
X.println(">>> threadMapSize: " + threadMap.size());
X.println(">>> idMap [size=" + idMap.size() + ']');
- X.println(">>> completedVersSize: " + completedVers.size());
+ X.println(">>> completedVersSortedSize: " + completedVersSorted.size());
+ X.println(">>> completedVersHashMapSize: " + completedVersHashMap.sizex());
}
/**
@@ -319,7 +332,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return Committed versions size.
*/
public int completedVersionsSize() {
- return completedVers.size();
+ return completedVersHashMap.size();
}
/**
@@ -329,7 +342,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* {@code false} otherwise.
*/
public boolean isCompleted(IgniteInternalTx tx) {
- return completedVers.containsKey(tx.xidVersion());
+ return completedVersHashMap.containsKey(tx.xidVersion());
}
/**
@@ -770,65 +783,59 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param map Collection to copy.
- * @param expVal Values to copy.
- * @return Copy of the collection.
+ * @param min Minimum version.
+ * @return Pair [committed, rolledback] - never {@code null}, elements potentially empty,
+ * but also never {@code null}.
*/
- private Collection<GridCacheVersion> copyOf(Map<GridCacheVersion, Boolean> map, boolean expVal) {
- Collection<GridCacheVersion> l = new LinkedList<>();
+ public IgnitePair<Collection<GridCacheVersion>> versions(GridCacheVersion min) {
+ Collection<GridCacheVersion> committed = null;
+ Collection<GridCacheVersion> rolledback = null;
- for (Map.Entry<GridCacheVersion, Boolean> e : map.entrySet()) {
- if (e.getValue() == expVal)
- l.add(e.getKey());
- }
+ for (Map.Entry<GridCacheVersion, Boolean> e : completedVersSorted.tailMap(min, true).entrySet()) {
+ if (e.getValue()) {
+ if (committed == null)
+ committed = new ArrayList<>();
- return l;
- }
+ committed.add(e.getKey());
+ }
+ else {
+ if (rolledback == null)
+ rolledback = new ArrayList<>();
- /**
- * Gets committed transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive?
- *
- * @param min Start (or minimum) version.
- * @return Committed transactions starting from the given version (non-inclusive).
- */
- public Collection<GridCacheVersion> committedVersions(GridCacheVersion min) {
- ConcurrentNavigableMap<GridCacheVersion, Boolean> tail
- = completedVers.tailMap(min, true);
+ rolledback.add(e.getKey());
+ }
+ }
- return F.isEmpty(tail) ? Collections.<GridCacheVersion>emptyList() : copyOf(tail, true);
+ return F.pair(
+ committed == null ? Collections.<GridCacheVersion>emptyList() : committed,
+ rolledback == null ? Collections.<GridCacheVersion>emptyList() : rolledback);
}
/**
- * Gets rolledback transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive?
- *
- * @param min Start (or minimum) version.
- * @return Committed transactions starting from the given version (non-inclusive).
+ * @return Collection of active transactions.
*/
- public Collection<GridCacheVersion> rolledbackVersions(GridCacheVersion min) {
- ConcurrentNavigableMap<GridCacheVersion, Boolean> tail
- = completedVers.tailMap(min, true);
-
- return F.isEmpty(tail) ? Collections.<GridCacheVersion>emptyList() : copyOf(tail, false);
+ public Collection<IgniteInternalTx> activeTransactions() {
+ return F.concat(false, idMap.values(), nearIdMap.values());
}
/**
* @param tx Tx to remove.
*/
public void removeCommittedTx(IgniteInternalTx tx) {
- completedVers.remove(tx.xidVersion(), true);
+ completedVersHashMap.remove(tx.xidVersion(), true);
+
+ if (tx.needsCompletedVersions())
+ completedVersSorted.remove(tx.xidVersion(), true);
}
/**
* @param tx Committed transaction.
- * @return If transaction was not already present in committed set.
*/
- public boolean addCommittedTx(IgniteInternalTx tx) {
- boolean res = addCommittedTx(tx.xidVersion(), tx.nearXidVersion());
+ public void addCommittedTx(IgniteInternalTx tx) {
+ addCommittedTx(tx, tx.xidVersion(), tx.nearXidVersion());
if (!tx.local() && !tx.near() && tx.onePhaseCommit())
- addCommittedTx(tx.nearXidVersion(), null);
-
- return res;
+ addCommittedTx(tx, tx.nearXidVersion(), null);
}
/**
@@ -836,60 +843,52 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return If transaction was not already present in committed set.
*/
public boolean addRolledbackTx(IgniteInternalTx tx) {
- return addRolledbackTx(tx.xidVersion());
- }
-
- /**
- * @return Collection of active transactions.
- */
- public Collection<IgniteInternalTx> activeTransactions() {
- return F.concat(false, idMap.values(), nearIdMap.values());
+ return addRolledbackTx(tx, tx.xidVersion());
}
/**
+ * @param tx Tx.
* @param xidVer Completed transaction version.
* @param nearXidVer Optional near transaction ID.
* @return If transaction was not already present in completed set.
*/
- public boolean addCommittedTx(GridCacheVersion xidVer, @Nullable GridCacheVersion nearXidVer) {
+ public boolean addCommittedTx(
+ IgniteInternalTx tx,
+ GridCacheVersion xidVer,
+ @Nullable GridCacheVersion nearXidVer
+ ) {
if (nearXidVer != null)
xidVer = new CommittedVersion(xidVer, nearXidVer);
- Boolean committed = completedVers.putIfAbsent(xidVer, true);
+ Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, true);
- if (committed == null || committed) {
- if (log.isDebugEnabled())
- log.debug("Added transaction to committed version set: " + xidVer);
+ if (committed0 == null && (tx == null || tx.needsCompletedVersions())) {
+ Boolean b = completedVersSorted.putIfAbsent(xidVer, true);
- return true;
+ assert b == null;
}
- else {
- if (log.isDebugEnabled())
- log.debug("Transaction is already present in rolled back version set: " + xidVer);
- return false;
- }
+ return committed0 == null || committed0;
}
/**
+ * @param tx Tx.
* @param xidVer Completed transaction version.
* @return If transaction was not already present in completed set.
*/
- public boolean addRolledbackTx(GridCacheVersion xidVer) {
- Boolean committed = completedVers.putIfAbsent(xidVer, false);
+ public boolean addRolledbackTx(
+ IgniteInternalTx tx,
+ GridCacheVersion xidVer
+ ) {
+ Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, false);
- if (committed == null || !committed) {
- if (log.isDebugEnabled())
- log.debug("Added transaction to rolled back version set: " + xidVer);
+ if (committed0 == null && (tx == null || tx.needsCompletedVersions())) {
+ Boolean b = completedVersSorted.putIfAbsent(xidVer, false);
- return true;
+ assert b == null;
}
- else {
- if (log.isDebugEnabled())
- log.debug("Transaction is already present in committed version set: " + xidVer);
- return false;
- }
+ return committed0 == null || !committed0;
}
/**
@@ -903,7 +902,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert min != null;
- tx.completedVersions(min, committedVersions(min), rolledbackVersions(min));
+ IgnitePair<Collection<GridCacheVersion>> versPair = versions(min);
+
+ tx.completedVersions(min, versPair.get1(), versPair.get2());
}
}
@@ -1027,18 +1028,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* so we don't do it here.
*/
- Boolean committed = completedVers.get(tx.xidVersion());
+ Boolean committed = completedVersHashMap.get(tx.xidVersion());
// 1. Make sure that committed version has been recorded.
if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
uncommitTx(tx);
- GridCacheVersion first = completedVers.isEmpty() ? null : completedVers.firstKey();
- GridCacheVersion last = completedVers.isEmpty() ? null : completedVers.lastKey();
-
throw new IgniteException("Missing commit version (consider increasing " +
- IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" +
- first + ", lastVer=" + last + ", tx=" + tx.xid() + ']');
+ IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() +
+ ", tx=" + tx.getClass().getSimpleName() + ']');
}
ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
@@ -1578,12 +1576,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return resFut;
}
- Boolean committed = null;
-
- for (Map.Entry<GridCacheVersion, Boolean> entry : completedVers.entrySet()) {
- if (entry.getValue() == null)
- continue;
+ boolean committed = false;
+ for (Map.Entry<GridCacheVersion, Boolean> entry : completedVersHashMap.entrySet()) {
if (entry.getKey() instanceof CommittedVersion) {
CommittedVersion comm = (CommittedVersion)entry.getKey();
@@ -1598,7 +1593,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled())
log.debug("Near transaction committed: " + committed);
- resFut.onDone(committed != null && committed);
+ resFut.onDone(committed);
return resFut;
}
@@ -1702,7 +1697,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// Not all transactions were found. Need to scan committed versions to check
// if transaction was already committed.
- for (Map.Entry<GridCacheVersion, Boolean> e : completedVers.entrySet()) {
+ for (Map.Entry<GridCacheVersion, Boolean> e : completedVersHashMap.entrySet()) {
if (!e.getValue())
continue;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java
index 04d1a85..7aa3734 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java
@@ -116,9 +116,6 @@ public class GridBoundedConcurrentLinkedHashMap<K, V> extends ConcurrentLinkedHa
/** {@inheritDoc} */
@Override public String toString() {
- // TODO GG-4788
- return policy() != SINGLE_Q ?
- S.toString(GridBoundedConcurrentLinkedHashMap.class, this) :
- S.toString(GridBoundedConcurrentLinkedHashMap.class, this, "entrySet", keySet());
+ return S.toString(GridBoundedConcurrentLinkedHashMap.class, this, "entrySet", keySet());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java
index a06f2ff..2801839 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java
@@ -156,9 +156,6 @@ public class GridBoundedConcurrentLinkedHashSet<E> extends GridSetWrapper<E> {
/** {@inheritDoc} */
@Override public String toString() {
- // TODO GG-4788
- return ((ConcurrentLinkedHashMap<E, Object>)map()).policy() != SINGLE_Q ?
- S.toString(GridBoundedConcurrentLinkedHashSet.class, this) :
- S.toString(GridBoundedConcurrentLinkedHashSet.class, this, "elements", map().keySet());
+ return S.toString(GridBoundedConcurrentLinkedHashSet.class, this, "elements", map().keySet());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
index b091652..3f6db30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.util;
import java.util.Comparator;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -46,7 +45,7 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap
private static final long serialVersionUID = 0L;
/** Element count. */
- private final AtomicInteger cnt = new AtomicInteger(0);
+ private final AtomicInteger cnt = new AtomicInteger();
/** Maximum size. */
private int max;
@@ -168,35 +167,21 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap
private void onPut() {
cnt.incrementAndGet();
- int c;
+ IgniteBiInClosure<K, V> lsnr = this.lsnr;
- while ((c = cnt.get()) > max) {
- // Decrement count.
- if (cnt.compareAndSet(c, c - 1)) {
- try {
- K key = firstEntry().getKey();
+ int delta = cnt.get() - max;
- V val;
+ for (int i = 0; i < delta && cnt.get() > max; i++) {
+ Entry<K, V> e = pollFirstEntry();
- // Make sure that an element is removed.
- while ((val = super.remove(firstEntry().getKey())) == null) {
- // No-op.
- }
+ if (e == null)
+ return;
- assert val != null;
-
- IgniteBiInClosure<K, V> lsnr = this.lsnr;
-
- // Listener notification.
- if (lsnr != null)
- lsnr.apply(key, val);
- }
- catch (NoSuchElementException ignored) {
- cnt.incrementAndGet();
+ cnt.decrementAndGet();
- return;
- }
- }
+ // Listener notification.
+ if (lsnr != null)
+ lsnr.apply(e.getKey(), e.getValue());
}
}
@@ -251,4 +236,4 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap
return rmvd;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java
index 6e0e876..d1a7bb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java
@@ -18,18 +18,12 @@
package org.apache.ignite.internal.util;
import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.IgniteSystemProperties;
import org.jsr166.ConcurrentHashMap8;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAP_CONCURRENCY_LEVEL;
-
/**
* Concurrent map factory.
*/
public class GridConcurrentFactory {
- /** Default concurrency level. */
- private static final int CONCURRENCY_LEVEL = IgniteSystemProperties.getInteger(IGNITE_MAP_CONCURRENCY_LEVEL, 256);
-
/**
* Ensure singleton.
*/
@@ -43,7 +37,6 @@ public class GridConcurrentFactory {
* @return New concurrent map.
*/
public static <K, V> ConcurrentMap<K, V> newMap() {
- return new ConcurrentHashMap8<>(16 * CONCURRENCY_LEVEL, 0.75f, CONCURRENCY_LEVEL);
+ return new ConcurrentHashMap8<>();
}
-
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java
index 5a53b4b..0c76787 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java
@@ -24,8 +24,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
-import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.SINGLE_Q;
-
/**
* Concurrent linked set implementation.
*/
@@ -123,9 +121,6 @@ public class GridConcurrentLinkedHashSet<E> extends GridSetWrapper<E> {
/** {@inheritDoc} */
@Override public String toString() {
- // TODO GG-4788
- return ((ConcurrentLinkedHashMap)map()).policy() != SINGLE_Q ?
- S.toString(GridConcurrentLinkedHashSet.class, this) :
- S.toString(GridConcurrentLinkedHashSet.class, this, "elements", map().keySet());
+ return S.toString(GridConcurrentLinkedHashSet.class, this, "elements", map().keySet());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
index 4ca00d9..d9ffdd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.util;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
+
/**
*
*/
@@ -29,7 +31,7 @@ public final class IgniteUuidCache {
/** Cache. */
private static final ConcurrentMap<UUID, UUID> cache =
- new GridBoundedConcurrentLinkedHashMap<>(MAX, 1024, 0.75f, 64);
+ new GridBoundedConcurrentLinkedHashMap<>(MAX, 1024, 0.75f, 64, PER_SEGMENT_Q);
/**
* Gets cached UUID to preserve memory.
@@ -56,4 +58,4 @@ public final class IgniteUuidCache {
private IgniteUuidCache() {
// No-op.
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 0a6d9aa..31674f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -17,11 +17,9 @@
package org.apache.ignite.internal.util.future;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicMarkableReference;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -35,7 +33,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
/**
* Future composed of multiple inner futures.
@@ -44,33 +41,38 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
/** */
private static final long serialVersionUID = 0L;
- /** Futures. */
- private final ConcurrentLinkedDeque8<IgniteInternalFuture<T>> futs = new ConcurrentLinkedDeque8<>();
+ /** */
+ private static final int INITED = 0b1;
- /** Pending futures. */
- private final Collection<IgniteInternalFuture<T>> pending = new ConcurrentLinkedDeque8<>();
+ /** */
+ private static final AtomicIntegerFieldUpdater<GridCompoundFuture> flagsUpd =
+ AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "flags");
- /** Listener call count. */
- private final AtomicInteger lsnrCalls = new AtomicInteger();
+ /** */
+ private static final AtomicIntegerFieldUpdater<GridCompoundFuture> lsnrCallsUpd =
+ AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls");
- /** Finished flag. */
- private final AtomicBoolean finished = new AtomicBoolean();
+ /** Futures. */
+ private final Collection<IgniteInternalFuture<T>> futs = new ArrayList<>();
/** Reducer. */
@GridToStringInclude
private IgniteReducer<T, R> rdc;
- /** Initialize flag. */
- private AtomicBoolean init = new AtomicBoolean(false);
-
- /** Result with a flag to control if reducer has been called. */
- private AtomicMarkableReference<R> res = new AtomicMarkableReference<>(null, false);
-
/** Exceptions to ignore. */
private Class<? extends Throwable>[] ignoreChildFailures;
- /** Error. */
- private AtomicReference<Throwable> err = new AtomicReference<>();
+ /**
+ * Updated via {@link #flagsUpd}.
+ *
+ * @see #INITED
+ */
+ @SuppressWarnings("unused")
+ private volatile int flags;
+
+ /** Updated via {@link #lsnrCallsUpd}. */
+ @SuppressWarnings("unused")
+ private volatile int lsnrCalls;
/**
*
@@ -104,7 +106,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
/** {@inheritDoc} */
@Override public boolean cancel() throws IgniteCheckedException {
if (onCancelled()) {
- for (IgniteInternalFuture<T> fut : futs)
+ for (IgniteInternalFuture<T> fut : futures())
fut.cancel();
return true;
@@ -118,8 +120,26 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
*
* @return Collection of futures.
*/
+ private Collection<IgniteInternalFuture<T>> futures(boolean pending) {
+ synchronized (futs) {
+ Collection<IgniteInternalFuture<T>> res = new ArrayList<>(futs.size());
+
+ for (IgniteInternalFuture<T> fut : futs) {
+ if (!pending || !fut.isDone())
+ res.add(fut);
+ }
+
+ return res;
+ }
+ }
+
+ /**
+ * Gets collection of futures.
+ *
+ * @return Collection of futures.
+ */
public Collection<IgniteInternalFuture<T>> futures() {
- return futs;
+ return futures(false);
}
/**
@@ -128,7 +148,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* @return Pending futures.
*/
public Collection<IgniteInternalFuture<T>> pending() {
- return pending;
+ return futures(true);
}
/**
@@ -147,7 +167,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* @return {@code True} if there are pending futures.
*/
public boolean hasPending() {
- return !pending.isEmpty();
+ return !pending().isEmpty();
}
/**
@@ -155,7 +175,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* {@link #markInitialized()} method is called on future.
*/
public boolean initialized() {
- return init.get();
+ return flagSet(INITED);
}
/**
@@ -166,18 +186,20 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
public void add(IgniteInternalFuture<T> fut) {
assert fut != null;
- pending.add(fut);
- futs.add(fut);
+ synchronized (futs) {
+ futs.add(fut);
+ }
fut.listen(new Listener());
- if (isCancelled())
+ if (isCancelled()) {
try {
fut.cancel();
}
catch (IgniteCheckedException e) {
onDone(e);
}
+ }
}
/**
@@ -185,7 +207,8 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
*
* @param futs Futures to add.
*/
- public void addAll(@Nullable IgniteInternalFuture<T>... futs) {
+ @SafeVarargs
+ public final void addAll(@Nullable IgniteInternalFuture<T>... futs) {
addAll(F.asList(futs));
}
@@ -195,9 +218,10 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* @param futs Futures to add.
*/
public void addAll(@Nullable Iterable<IgniteInternalFuture<T>> futs) {
- if (futs != null)
+ if (futs != null) {
for (IgniteInternalFuture<T> fut : futs)
add(fut);
+ }
}
/**
@@ -219,10 +243,34 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
}
/**
+ * @param flag Flag to CAS.
+ * @return {@code True} if CAS succeeds.
+ */
+ private boolean casFlag(int flag) {
+ for (;;) {
+ int flags0 = flags;
+
+ if ((flags0 & flag) != 0)
+ return false;
+
+ if (flagsUpd.compareAndSet(this, flags0, flags0 | flag))
+ return true;
+ }
+ }
+
+ /**
+ * @param flag Flag to check.
+ * @return {@code True} if set.
+ */
+ private boolean flagSet(int flag) {
+ return (flags & flag) != 0;
+ }
+
+ /**
* Mark this future as initialized.
*/
public void markInitialized() {
- if (init.compareAndSet(false, true))
+ if (casFlag(INITED))
// Check complete to make sure that we take care
// of all the ignored callbacks.
checkComplete();
@@ -232,22 +280,14 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* Check completeness of the future.
*/
private void checkComplete() {
- Throwable err = this.err.get();
-
- boolean ignore = ignoreFailure(err);
-
- if (init.get() && (res.isMarked() || lsnrCalls.get() == futs.sizex() || (err != null && !ignore))
- && finished.compareAndSet(false, true)) {
+ if (flagSet(INITED) && !isDone() && lsnrCalls == futuresSize()) {
try {
- if (err == null && rdc != null && !res.isMarked())
- res.compareAndSet(null, rdc.reduce(), false, true);
+ onDone(rdc != null ? rdc.reduce() : null);
}
catch (RuntimeException e) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
onDone(e);
-
- return;
}
catch (AssertionError e) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
@@ -256,8 +296,15 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
throw e;
}
+ }
+ }
- onDone(res.getReference(), ignore ? null : err);
+ /**
+ * @return Futures size.
+ */
+ private int futuresSize() {
+ synchronized (futs) {
+ return futs.size();
}
}
@@ -288,7 +335,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
"cancelled", isCancelled(),
"err", error(),
"futs",
- F.viewReadOnly(futs, new C1<IgniteInternalFuture<T>, String>() {
+ F.viewReadOnly(futures(), new C1<IgniteInternalFuture<T>, String>() {
@Override public String apply(IgniteInternalFuture<T> f) {
return Boolean.toString(f.isDone());
}
@@ -305,14 +352,12 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
/** {@inheritDoc} */
@Override public void apply(IgniteInternalFuture<T> fut) {
- pending.remove(fut);
-
try {
T t = fut.get();
try {
- if (rdc != null && !rdc.collect(t) && !res.isMarked())
- res.compareAndSet(null, rdc.reduce(), false, true);
+ if (rdc != null && !rdc.collect(t))
+ onDone(rdc.reduce());
}
catch (RuntimeException e) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
@@ -331,18 +376,20 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
}
catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException |
ClusterTopologyCheckedException e) {
- err.compareAndSet(null, e);
+ if (!ignoreFailure(e))
+ onDone(e);
}
catch (IgniteCheckedException e) {
- if (!ignoreFailure(e))
+ if (!ignoreFailure(e)) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
- err.compareAndSet(null, e);
+ onDone(e);
+ }
}
catch (RuntimeException e) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
- err.compareAndSet(null, e);
+ onDone(e);
}
catch (AssertionError e) {
U.error(null, "Failed to execute compound future reducer: " + this, e);
@@ -353,7 +400,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
throw e;
}
- lsnrCalls.incrementAndGet();
+ lsnrCallsUpd.incrementAndGet(GridCompoundFuture.this);
checkComplete();
}
@@ -363,4 +410,4 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
return "Compound future listener: " + GridCompoundFuture.this;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java b/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
index d93f12e..b3747d7 100644
--- a/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
+++ b/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
@@ -3805,4 +3805,4 @@ public class ConcurrentHashMap8<K, V>
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
index 75db13c..28e38d7 100644
--- a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
+++ b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
@@ -1735,4 +1735,4 @@ public class ConcurrentLinkedDeque8<E>
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
index 5b7381e..e8f8e0f 100644
--- a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
+++ b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
@@ -17,7 +17,6 @@ import java.util.AbstractSet;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.ConcurrentModificationException;
-import java.util.Deque;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
@@ -28,6 +27,9 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
@@ -264,12 +266,14 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
private volatile V val;
/** Reference to a node in queue for fast removal operations. */
+ @GridToStringExclude
private volatile ConcurrentLinkedDeque8.Node node;
/** Modification count of the map for duplicates exclusion. */
private volatile int modCnt;
/** Link to the next entry in a bucket */
+ @GridToStringExclude
private final HashEntry<K, V> next;
/**
@@ -332,6 +336,11 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
static <K, V> HashEntry<K, V>[] newArray(int i) {
return new HashEntry[i];
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HashEntry.class, this, "key", key, "val", val);
+ }
}
/**
@@ -749,7 +758,7 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
recordInsert(e, (ConcurrentLinkedDeque8)segEntryQ);
if (maxCap > 0)
- checkRemoveEldestEntrySegment();
+ checkRemoveEldestEntrySegment(c);
break;
@@ -757,7 +766,7 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
segEntryQ.add(e);
if (maxCap > 0)
- checkRemoveEldestEntrySegment();
+ checkRemoveEldestEntrySegment(c);
break;
@@ -779,23 +788,21 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
}
/**
- *
+ * @param cnt Segment entries count.
*/
- private void checkRemoveEldestEntrySegment() {
+ private void checkRemoveEldestEntrySegment(int cnt) {
assert maxCap > 0;
- int rmvCnt = sizex() - maxCap;
-
- for (int i = 0; i < rmvCnt; i++) {
+ if (cnt - ((maxCap / segments.length) + 1) > 0) {
HashEntry<K, V> e0 = segEntryQ.poll();
- if (e0 == null)
- break;
-
- removeLocked(e0.key, e0.hash, null /*no need to compare*/, false);
+ assert e0 != null;
- if (sizex() <= maxCap)
- break;
+ removeLocked(
+ e0.key,
+ e0.hash,
+ null /*no need to compare*/,
+ false);
}
}
@@ -1812,34 +1819,22 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
* @param asc {@code True} for ascending iterator.
*/
HashIterator(boolean asc) {
- // TODO GG-4788 - Need to fix iterators for ConcurrentLinkedHashMap in perSegment mode
- if (qPlc != SINGLE_Q)
- throw new IllegalStateException("Iterators are not supported in 'perSegmentQueue' modes.");
-
modCnt = ConcurrentLinkedHashMap.this.modCnt.intValue();
// Init delegate.
- delegate = asc ? entryQ.iterator() : entryQ.descendingIterator();
-
- advance();
- }
+ switch (qPlc) {
+ case SINGLE_Q:
+ delegate = asc ? entryQ.iterator() : entryQ.descendingIterator();
- /**
- * @return Copy of the queue.
- */
- private Deque<HashEntry<K, V>> copyQueue() {
- int i = entryQ.sizex();
-
- Deque<HashEntry<K, V>> res = new ArrayDeque<>(i);
-
- Iterator<HashEntry<K, V>> iter = entryQ.iterator();
+ break;
- while (iter.hasNext() && i-- >= 0)
- res.add(iter.next());
+ default:
+ assert qPlc == PER_SEGMENT_Q || qPlc == PER_SEGMENT_Q_OPTIMIZED_RMV : qPlc;
- assert !iter.hasNext() : "Entries queue has been modified.";
+ delegate = new HashIteratorDelegate();
+ }
- return res;
+ advance();
}
/**
@@ -1901,6 +1896,130 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
}
/**
+ *
+ */
+ private class HashIteratorDelegate implements Iterator<HashEntry<K, V>> {
+ /** */
+ private HashEntry<K, V>[] curTbl;
+
+ /** */
+ private int nextSegIdx;
+
+ /** */
+ private int nextTblIdx;
+
+ /** */
+ private HashEntry<K, V> next;
+
+ /** */
+ private HashEntry<K, V> next0;
+
+ /** */
+ private HashEntry<K, V> cur;
+
+ /**
+ *
+ */
+ public HashIteratorDelegate() {
+ nextSegIdx = segments.length - 1;
+ nextTblIdx = -1;
+
+ advance();
+ }
+
+ /**
+ *
+ */
+ private void advance() {
+ if (next0 != null && advanceInBucket(next0, true))
+ return;
+
+ while (nextTblIdx >= 0) {
+ HashEntry<K, V> bucket = curTbl[nextTblIdx--];
+
+ if (bucket != null && advanceInBucket(bucket, false))
+ return;
+ }
+
+ while (nextSegIdx >= 0) {
+ int nextSegIdx0 = nextSegIdx--;
+
+ Segment seg = segments[nextSegIdx0];
+
+ curTbl = seg.tbl;
+
+ for (int j = curTbl.length - 1; j >= 0; --j) {
+ HashEntry<K, V> bucket = curTbl[j];
+
+ if (bucket != null && advanceInBucket(bucket, false)) {
+ nextTblIdx = j - 1;
+
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * @param e Current next.
+ * @return {@code True} if advance succeeded.
+ */
+ @SuppressWarnings( {"unchecked"})
+ private boolean advanceInBucket(@Nullable HashEntry<K, V> e, boolean skipFirst) {
+ if (e == null)
+ return false;
+
+ next0 = e;
+
+ do {
+ if (!skipFirst) {
+ next = next0;
+
+ return true;
+ }
+ else
+ skipFirst = false;
+ }
+ while ((next0 = next0.next) != null);
+
+ assert next0 == null;
+
+ next = null;
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return next != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HashEntry<K, V> next() {
+ HashEntry<K, V> e = next;
+
+ if (e == null)
+ throw new NoSuchElementException();
+
+ advance();
+
+ return e;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ if (cur == null)
+ throw new IllegalStateException();
+
+ HashEntry<K, V> e = cur;
+
+ cur = null;
+
+ ConcurrentLinkedHashMap.this.remove(e.key, e.val);
+ }
+ }
+
+ /**
* Key iterator implementation.
*/
private final class KeyIterator extends HashIterator implements Iterator<K>, Enumeration<K> {
@@ -2154,13 +2273,17 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
* the fastest "natural" evicts for bounded maps.
* <p>
* NOTE: Remove operations on map are slower than with other policies.
+ * <p>
+ * NOTE: Iteration order is not preserved, i.e. iteration goes as if it was ordinary hash map.
*/
PER_SEGMENT_Q,
/**
* Instance of {@code GridConcurrentLinkedDequeue} is created for each segment. This gives
* faster "natural" evicts for bounded queues and better remove operation times.
+ * <p>
+ * NOTE: Iteration order is not preserved, i.e. iteration goes as if it was ordinary hash map.
*/
PER_SEGMENT_Q_OPTIMIZED_RMV
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java
index 50ba241..616fd43 100644
--- a/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java
@@ -26,12 +26,18 @@ import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
/**
* Tests affinity function with different number of backups.
*/
public class GridCacheAffinityBackupsSelfTest extends GridCommonAbstractTest {
+ /** */
+ private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
/** Number of backups. */
private int backups;
@@ -45,6 +51,8 @@ public class GridCacheAffinityBackupsSelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setCacheMode(CacheMode.PARTITIONED);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 3a530f2..1d79e20 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -4696,7 +4696,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
TransactionIsolation txIsolation)
throws Exception
{
- log.info("Test tx skip store [concurrency=" + txConcurrency + ", isolation=" + txIsolation + ']');
+ info("Test tx skip store [concurrency=" + txConcurrency + ", isolation=" + txIsolation + ']');
cache.removeAll(data.keySet());
checkEmpty(cache, cacheSkipStore);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java
index 6a0b9ad..19e49f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -32,13 +33,14 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
*
*/
public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTest {
/** */
- private volatile Integer failedKey;
+ private volatile boolean putFailed;
/** */
private String maxCompletedTxCount;
@@ -67,6 +69,7 @@ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTes
ccfg.setCacheMode(PARTITIONED);
ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
cfg.setCacheConfiguration(ccfg);
@@ -90,43 +93,48 @@ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTes
final AtomicInteger keyStart = new AtomicInteger();
+ final ConcurrentLinkedDeque<Integer> q = new ConcurrentLinkedDeque<>();
+
GridTestUtils.runMultiThreaded(new Callable<Object>() {
@Override public Object call() throws Exception {
int start = keyStart.getAndAdd(KEYS_PER_THREAD);
- for (int i = 0; i < KEYS_PER_THREAD && failedKey == null; i++) {
+ for (int i = 0; i < KEYS_PER_THREAD && !putFailed; i++) {
int key = start + i;
try {
cache.put(key, 1);
}
catch (Exception e) {
- log.info("Put failed: " + e);
+ log.info("Put failed [err=" + e + ", i=" + i + ']');
+
+ putFailed = true;
- failedKey = key;
+ q.add(key);
}
}
-
return null;
}
}, 10, "put-thread");
- assertNotNull("Test failed to provoke 'missing commit version' error.", failedKey);
+ assertTrue("Test failed to provoke 'missing commit version' error.", putFailed);
- log.info("Trying to update " + failedKey);
+ for (Integer key : q) {
+ log.info("Trying to update " + key);
- IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
+ IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
- asyncCache.put(failedKey, 2);
+ asyncCache.put(key, 2);
- IgniteFuture<?> fut = asyncCache.future();
+ IgniteFuture<?> fut = asyncCache.future();
- try {
- fut.get(5000);
- }
- catch (IgniteFutureTimeoutException ignore) {
- fail("Put failed to finish in 5s.");
+ try {
+ fut.get(5000);
+ }
+ catch (IgniteFutureTimeoutException ignore) {
+ fail("Put failed to finish in 5s: " + key);
+ }
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index abb2767..b93acf5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -541,7 +541,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
@Nullable IgniteInternalTx tx,
UUID evtNodeId,
UUID affNodeId,
- boolean writeThrough,
boolean retval,
boolean evt,
boolean metrics,
@@ -894,4 +893,4 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
@Override public void onUnlock() {
// No-op.
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
index a2440e2..ad51600 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
@@ -513,4 +513,4 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
assertTrue(((IgniteKernal)ignite).internalCache().context().isNear());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
index a2308c6..8f5e07b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
@@ -1090,4 +1090,4 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
assertEquals("Not stopped IDs: " + notStopped, 0, notStopped.size());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java
index e6dc7e6..8ce7ae3 100644
--- a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java
@@ -52,4 +52,4 @@ public class GridBoundedConcurrentLinkedHashMapSelfTest extends GridCommonAbstra
assert it.next() == 9;
assert it.next() == 10;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java
index a09ba15..7bcbd07 100644
--- a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java
@@ -19,13 +19,18 @@ package org.apache.ignite.lang.utils;
import java.util.Date;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jsr166.ConcurrentLinkedHashMap;
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
+
/**
* This class tests basic contracts of {@code ConcurrentLinkedHashMap}.
*/
@@ -264,4 +269,59 @@ public class GridConcurrentLinkedHashMapSelfTest extends GridCommonAbstractTest
assert nextVal == -1 : "Unexpected value: " + nextVal;
}
-}
\ No newline at end of file
+
+ /**
+ *
+ */
+ public void testIterationInPerSegmentModes() {
+ checkIteration(PER_SEGMENT_Q);
+ checkIteration(PER_SEGMENT_Q_OPTIMIZED_RMV);
+ }
+
+ /**
+ * @param plc Policy.
+ */
+ private void checkIteration(ConcurrentLinkedHashMap.QueuePolicy plc) {
+ ConcurrentLinkedHashMap<Integer, Integer> map =
+ new ConcurrentLinkedHashMap<>(10,
+ 0.75f,
+ 16,
+ 0,
+ plc);
+
+ Map<Integer, Integer> map0 = new HashMap<>();
+
+ int cnt = 0;
+
+ for (int i = 0; i < 100_000; i++) {
+ int key = ThreadLocalRandom.current().nextInt(15000);
+ int val = ThreadLocalRandom.current().nextInt(15000);
+
+ Integer rmv0 = map0.put(key, val);
+
+ if (rmv0 == null)
+ cnt++;
+
+ Integer rmv = map.put(key, val);
+
+ assertEquals(rmv0, rmv);
+ }
+
+ int checkCnt = 0;
+
+ for (Map.Entry<Integer, Integer> e : map.entrySet()) {
+ checkCnt++;
+
+ Integer rmv = map0.remove(e.getKey());
+
+ assertNotNull(rmv);
+ assertEquals(rmv, e.getValue());
+ }
+
+ assertEquals(cnt, checkCnt);
+
+ info("Puts count: " + cnt);
+
+ assert map0.isEmpty() : map0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 71f3ee3..c19e718 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -95,7 +95,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
* Super class for all common tests.
*/
public abstract class GridCommonAbstractTest extends GridAbstractTest {
- /**Cache peek modes array that consist of only ONHEAP mode. */
+ /** Cache peek modes array that consist of only ONHEAP mode. */
protected static final CachePeekMode[] ONHEAP_PEEK_MODES = new CachePeekMode[] {CachePeekMode.ONHEAP};
/**
@@ -1087,4 +1087,4 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
}
}
}
-}
\ No newline at end of file
+}