You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/10 15:55:25 UTC
[18/53] [abbrv] ignite git commit: ignite-4535 : Removed sync evicts.
ignite-4535 : Removed sync evicts.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/65d816f3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/65d816f3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/65d816f3
Branch: refs/heads/ignite-3477-master
Commit: 65d816f39a6985c2cc2104960ec1f41574e4e3a6
Parents: f2df0a7
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Fri Mar 31 17:20:27 2017 +0300
Committer: Ilya Lantukh <il...@gridgain.com>
Committed: Fri Mar 31 17:20:27 2017 +0300
----------------------------------------------------------------------
.../processors/cache/CacheEvictionManager.java | 15 -
.../processors/cache/CacheMetricsImpl.java | 9 +-
.../cache/CacheOffheapEvictionManager.java | 15 -
.../processors/cache/GridCacheAdapter.java | 35 +-
.../cache/GridCacheEvictionManager.java | 1474 +-----------------
.../processors/cache/GridCacheUtils.java | 8 -
6 files changed, 43 insertions(+), 1513 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/65d816f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictionManager.java
index d536a98..b614728 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictionManager.java
@@ -41,11 +41,6 @@ public interface CacheEvictionManager extends GridCacheManager {
public void touch(GridCacheEntryEx e, AffinityTopologyVersion topVer);
/**
- * Notifications.
- */
- public void unwind();
-
- /**
* @param entry Entry to attempt to evict.
* @param obsoleteVer Obsolete version.
* @param filter Optional entry filter.
@@ -60,19 +55,9 @@ public interface CacheEvictionManager extends GridCacheManager {
@Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException;
/**
- * @return Current size of evict queue.
- */
- public int evictQueueSize();
-
- /**
* @param keys Keys to evict.
* @param obsoleteVer Obsolete version.
* @throws IgniteCheckedException In case of error.
*/
public void batchEvict(Collection<?> keys, @Nullable GridCacheVersion obsoleteVer) throws IgniteCheckedException;
-
- /**
- * @return {@code True} if either evicts or near evicts are synchronized, {@code false} otherwise.
- */
- public boolean evictSyncOrNearSync();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/65d816f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 534b6b2..dbe53ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -322,14 +322,7 @@ public class CacheMetricsImpl implements CacheMetrics {
/** {@inheritDoc} */
@Override public int getDhtEvictQueueCurrentSize() {
- GridCacheContext<?, ?> ctx = cctx.isNear() ? dhtCtx : cctx;
-
- if (ctx == null)
- return -1;
-
- CacheEvictionManager evictMgr = ctx.evicts();
-
- return evictMgr != null ? evictMgr.evictQueueSize() : -1;
+ return -1;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/65d816f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java
index 6c925ad..e242b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java
@@ -61,11 +61,6 @@ public class CacheOffheapEvictionManager extends GridCacheManagerAdapter impleme
}
/** {@inheritDoc} */
- @Override public void unwind() {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Override public boolean evict(@Nullable GridCacheEntryEx entry,
@Nullable GridCacheVersion obsoleteVer,
boolean explicit,
@@ -74,17 +69,7 @@ public class CacheOffheapEvictionManager extends GridCacheManagerAdapter impleme
}
/** {@inheritDoc} */
- @Override public int evictQueueSize() {
- return 0;
- }
-
- /** {@inheritDoc} */
@Override public void batchEvict(Collection<?> keys, @Nullable GridCacheVersion obsoleteVer) {
// No-op.
}
-
- /** {@inheritDoc} */
- @Override public boolean evictSyncOrNearSync() {
- return false;
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/65d816f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 4a51c53..6b54230 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2308,8 +2308,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
public IgniteInternalFuture<V> getAndPutAsync0(final K key,
final V val,
- @Nullable final CacheEntryPredicate filter)
- {
+ @Nullable final CacheEntryPredicate filter) {
return asyncOp(new AsyncOp<V>() {
@Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.putAsync(ctx, readyTopVer, key, val, true, filter)
@@ -2865,7 +2864,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return syncOp(new SyncOp<V>(true) {
@Override public V op(GridNearTxLocal tx) throws IgniteCheckedException {
- K key0 = keepBinary ? (K) ctx.toCacheKeyObject(key) : key;
+ K key0 = keepBinary ? (K)ctx.toCacheKeyObject(key) : key;
V ret = tx.removeAllAsync(ctx,
null,
@@ -2875,9 +2874,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/*singleRmv*/false).get().value();
if (ctx.config().getInterceptor() != null) {
- K key = keepBinary ? (K) ctx.unwrapBinaryIfNeeded(key0, true, false) : key0;
+ K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false) : key0;
- return (V) ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2();
+ return (V)ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2();
}
return ret;
@@ -3948,7 +3947,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public Iterator<Cache.Entry<K, V>> scanIterator(boolean keepBinary, @Nullable IgniteBiPredicate<Object, Object> p)
+ @Override public Iterator<Cache.Entry<K, V>> scanIterator(boolean keepBinary,
+ @Nullable IgniteBiPredicate<Object, Object> p)
throws IgniteCheckedException {
return igniteIterator(keepBinary, p);
}
@@ -3975,7 +3975,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Distributed ignite cache iterator.
* @throws IgniteCheckedException If failed.
*/
- private Iterator<Cache.Entry<K, V>> igniteIterator(boolean keepBinary, @Nullable IgniteBiPredicate<Object, Object> p)
+ private Iterator<Cache.Entry<K, V>> igniteIterator(boolean keepBinary,
+ @Nullable IgniteBiPredicate<Object, Object> p)
throws IgniteCheckedException {
GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;
@@ -4518,17 +4519,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
GridCacheVersion obsoleteVer = ctx.versions().next();
- if (!ctx.evicts().evictSyncOrNearSync()) {
- try {
- ctx.evicts().batchEvict(keys, obsoleteVer);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to perform batch evict for keys: " + keys, e);
- }
+ try {
+ ctx.evicts().batchEvict(keys, obsoleteVer);
}
- else {
- for (K k : keys)
- evictx(k, obsoleteVer, CU.empty0());
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to perform batch evict for keys: " + keys, e);
}
}
@@ -4547,7 +4542,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Cached value.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public final V get(K key, boolean deserializeBinary, final boolean needVer) throws IgniteCheckedException {
+ @Nullable public final V get(K key, boolean deserializeBinary,
+ final boolean needVer) throws IgniteCheckedException {
String taskName = ctx.kernalContext().job().currentTaskName();
return get0(key, taskName, deserializeBinary, needVer);
@@ -5377,7 +5373,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param peekModes Cache peek modes.
* @param partition partition.
*/
- private PartitionSizeLongJob(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes, int partition) {
+ private PartitionSizeLongJob(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes,
+ int partition) {
super(cacheName, topVer);
this.peekModes = peekModes;
http://git-wip-us.apache.org/repos/asf/ignite/blob/65d816f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index d752b53..4294578 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -110,57 +110,15 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
/** Eviction filter. */
private EvictionFilter filter;
- /** Eviction buffer. */
- private final ConcurrentLinkedDeque8<EvictionInfo> bufEvictQ = new ConcurrentLinkedDeque8<>();
-
- /** Active eviction futures. */
- private final Map<Long, EvictionFuture> futs = new ConcurrentHashMap8<>();
-
- /** Futures count modification lock. */
- private final Lock futsCntLock = new ReentrantLock();
-
- /** Futures count condition. */
- private final Condition futsCntCond = futsCntLock.newCondition();
-
- /** Active futures count. */
- private volatile int activeFutsCnt;
-
- /** Max active futures count. */
- private int maxActiveFuts;
-
- /** Generator of future IDs. */
- private final AtomicLong idGen = new AtomicLong();
-
- /** Evict backup synchronized flag. */
- private boolean evictSync;
-
- /** Evict near synchronized flag. */
- private boolean nearSync;
-
- /** Flag to hold {@code evictSync || nearSync} result. */
- private boolean evictSyncAgr;
-
/** Policy enabled. */
private boolean plcEnabled;
- /** Backup entries worker. */
- private BackupWorker backupWorker;
-
- /** Backup entries worker thread. */
- private IgniteThread backupWorkerThread;
-
/** Busy lock. */
private final GridBusyLock busyLock = new GridBusyLock();
- /** Stopping flag. */
- private volatile boolean stopping;
-
/** Stopped flag. */
private boolean stopped;
- /** Current future. */
- private final AtomicReference<EvictionFuture> curEvictFut = new AtomicReference<>();
-
/** First eviction flag. */
private volatile boolean firstEvictWarn;
@@ -180,147 +138,17 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
if (cfg.getEvictSynchronizedKeyBufferSize() < 0)
throw new IgniteCheckedException("Configuration parameter 'evictSynchronizedKeyBufferSize' cannot be negative.");
- if (!cctx.isLocal()) {
- evictSync = cfg.isEvictSynchronized() && !cctx.isNear();
-
- nearSync = isNearEnabled(cctx) && !cctx.isNear() && cfg.isEvictSynchronized();
- }
- else {
- if (cfg.isEvictSynchronized())
- U.warn(log, "Ignored 'evictSynchronized' configuration property for LOCAL cache: " + cctx.namexx());
-
- if (cfg.getNearConfiguration() != null && cfg.isEvictSynchronized())
- U.warn(log, "Ignored 'evictNearSynchronized' configuration property for LOCAL cache: " + cctx.namexx());
- }
-
- if (cctx.isDht() && !nearSync && evictSync && isNearEnabled(cctx))
- throw new IgniteCheckedException("Illegal configuration (may lead to data inconsistency) " +
- "[evictSync=true, evictNearSync=false]");
-
- reportConfigurationProblems();
-
- evictSyncAgr = evictSync || nearSync;
-
- if (evictSync && !cctx.isNear() && plcEnabled) {
- backupWorker = new BackupWorker();
-
- cctx.events().addListener(
- new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT ||
- evt.type() == EVT_NODE_JOINED;
-
- DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
- // Notify backup worker on each topology change.
- if (cctx.discovery().cacheAffinityNode(discoEvt.eventNode(), cctx.name()))
- backupWorker.addEvent(discoEvt);
- }
- },
- EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED);
- }
-
- if (evictSyncAgr) {
- if (cfg.getEvictSynchronizedTimeout() <= 0)
- throw new IgniteCheckedException("Configuration parameter 'evictSynchronousTimeout' should be positive.");
-
- if (cfg.getEvictSynchronizedConcurrencyLevel() <= 0)
- throw new IgniteCheckedException("Configuration parameter 'evictSynchronousConcurrencyLevel' " +
- "should be positive.");
-
- maxActiveFuts = cfg.getEvictSynchronizedConcurrencyLevel();
-
- cctx.io().addHandler(cctx.cacheId(), GridCacheEvictionRequest.class, new CI2<UUID, GridCacheEvictionRequest>() {
- @Override public void apply(UUID nodeId, GridCacheEvictionRequest msg) {
- processEvictionRequest(nodeId, msg);
- }
- });
-
- cctx.io().addHandler(cctx.cacheId(), GridCacheEvictionResponse.class, new CI2<UUID, GridCacheEvictionResponse>() {
- @Override public void apply(UUID nodeId, GridCacheEvictionResponse msg) {
- processEvictionResponse(nodeId, msg);
- }
- });
-
- cctx.events().addListener(
- new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
-
- DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
- for (EvictionFuture fut : futs.values())
- fut.onNodeLeft(discoEvt.eventNode().id());
- }
- },
- EVT_NODE_FAILED, EVT_NODE_LEFT);
- }
-
if (log.isDebugEnabled())
log.debug("Eviction manager started on node: " + cctx.nodeId());
}
- /**
- * Outputs warnings if potential configuration problems are detected.
- */
- private void reportConfigurationProblems() {
- CacheMode mode = cctx.config().getCacheMode();
-
- if (plcEnabled && !cctx.isNear() && mode == PARTITIONED) {
- if (!evictSync) {
- U.warn(log, "Evictions are not synchronized with other nodes in topology " +
- "which provides 2x-3x better performance but may cause data inconsistency if cache store " +
- "is not configured (consider changing 'evictSynchronized' configuration property).",
- "Evictions are not synchronized for cache: " + cctx.namexx());
- }
-
- if (!nearSync && isNearEnabled(cctx)) {
- U.warn(log, "Evictions on primary node are not synchronized with near caches on other nodes " +
- "which provides 2x-3x better performance but may cause data inconsistency (consider changing " +
- "'nearEvictSynchronized' configuration property).",
- "Evictions are not synchronized with near caches on other nodes for cache: " + cctx.namexx());
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void onKernalStart0() throws IgniteCheckedException {
- super.onKernalStart0();
-
- if (plcEnabled && evictSync && !cctx.isNear()) {
- // Add dummy event to worker.
- DiscoveryEvent evt = cctx.discovery().localJoinEvent();
-
- backupWorker.addEvent(evt);
-
- backupWorkerThread = new IgniteThread(backupWorker);
- backupWorkerThread.start();
- }
- }
-
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
super.onKernalStop0(cancel);
- // Change stopping first.
- stopping = true;
-
busyLock.block();
try {
- // Stop backup worker.
- if (evictSync && !cctx.isNear() && backupWorker != null) {
- backupWorker.cancel();
-
- U.join(
- backupWorkerThread,
- log);
- }
-
- // Cancel all active futures.
- for (EvictionFuture fut : futs.values())
- fut.cancel();
-
if (log.isDebugEnabled())
log.debug("Eviction manager stopped on node: " + cctx.nodeId());
}
@@ -332,42 +160,6 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
}
/**
- * @return Current size of evict queue.
- */
- public int evictQueueSize() {
- return bufEvictQ.sizex();
- }
-
- /**
- * @param nodeId Sender node ID.
- * @param res Response.
- */
- private void processEvictionResponse(UUID nodeId, GridCacheEvictionResponse res) {
- assert nodeId != null;
- assert res != null;
-
- if (log.isDebugEnabled())
- log.debug("Processing eviction response [node=" + nodeId + ", localNode=" + cctx.nodeId() +
- ", res=" + res + ']');
-
- if (!enterBusy())
- return;
-
- try {
- EvictionFuture fut = futs.get(res.futureId());
-
- if (fut != null)
- fut.onResponse(nodeId, res);
- else if (log.isDebugEnabled())
- log.debug("Eviction future for response is not found [res=" + res + ", node=" + nodeId +
- ", localNode=" + cctx.nodeId() + ']');
- }
- finally {
- busyLock.leaveBusy();
- }
- }
-
- /**
* @return {@code True} if entered busy.
*/
private boolean enterBusy() {
@@ -384,143 +176,6 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
}
/**
- * @param nodeId Sender node ID.
- * @param req Request.
- */
- private void processEvictionRequest(UUID nodeId, GridCacheEvictionRequest req) {
- assert nodeId != null;
- assert req != null;
-
- if (!enterBusy())
- return;
-
- try {
- if (req.classError() != null) {
- if (log.isDebugEnabled())
- log.debug("Class got undeployed during eviction: " + req.classError());
-
- sendEvictionResponse(nodeId, new GridCacheEvictionResponse(cctx.cacheId(), req.futureId(), true));
-
- return;
- }
-
- AffinityTopologyVersion topVer = lockTopology();
-
- try {
- if (!topVer.equals(req.topologyVersion())) {
- if (log.isDebugEnabled())
- log.debug("Topology version is different [locTopVer=" + topVer +
- ", rmtTopVer=" + req.topologyVersion() + ']');
-
- sendEvictionResponse(nodeId,
- new GridCacheEvictionResponse(cctx.cacheId(), req.futureId(), true));
-
- return;
- }
-
- processEvictionRequest0(nodeId, req);
- }
- finally {
- unlockTopology();
- }
- }
- finally {
- busyLock.leaveBusy();
- }
- }
-
- /**
- * @param nodeId Sender node ID.
- * @param req Request.
- */
- private void processEvictionRequest0(UUID nodeId, GridCacheEvictionRequest req) {
- if (log.isDebugEnabled())
- log.debug("Processing eviction request [node=" + nodeId + ", localNode=" + cctx.nodeId() +
- ", reqSize=" + req.entries().size() + ']');
-
- // Partition -> {{Key, Version}, ...}.
- // Group DHT and replicated cache entries by their partitions.
- Map<Integer, Collection<CacheEvictionEntry>> dhtEntries = new HashMap<>();
-
- Collection<CacheEvictionEntry> nearEntries = new LinkedList<>();
-
- for (CacheEvictionEntry e : req.entries()) {
- boolean near = e.near();
-
- if (!near) {
- // Lock is required.
- Collection<CacheEvictionEntry> col =
- F.addIfAbsent(dhtEntries, cctx.affinity().partition(e.key()),
- new LinkedList<CacheEvictionEntry>());
-
- assert col != null;
-
- col.add(e);
- }
- else
- nearEntries.add(e);
- }
-
- GridCacheEvictionResponse res = new GridCacheEvictionResponse(cctx.cacheId(), req.futureId());
-
- GridCacheVersion obsoleteVer = cctx.versions().next();
-
- // DHT and replicated cache entries.
- for (Map.Entry<Integer, Collection<CacheEvictionEntry>> e : dhtEntries.entrySet()) {
- int part = e.getKey();
-
- boolean locked = lockPartition(part); // Will return false if preloading is disabled.
-
- try {
- for (CacheEvictionEntry t : e.getValue()) {
- KeyCacheObject key = t.key();
- GridCacheVersion ver = t.version();
- boolean near = t.near();
-
- assert !near;
-
- boolean evicted = evictLocally(key, ver, near, obsoleteVer);
-
- if (log.isDebugEnabled())
- log.debug("Evicted key [key=" + key + ", ver=" + ver + ", near=" + near +
- ", evicted=" + evicted +']');
-
- if (locked && evicted)
- // Preloading is in progress, we need to save eviction info.
- saveEvictionInfo(key, ver, part);
-
- if (!evicted)
- res.addRejected(key);
- }
- }
- finally {
- if (locked)
- unlockPartition(part);
- }
- }
-
- // Near entries.
- for (CacheEvictionEntry t : nearEntries) {
- KeyCacheObject key = t.key();
- GridCacheVersion ver = t.version();
- boolean near = t.near();
-
- assert near;
-
- boolean evicted = evictLocally(key, ver, near, obsoleteVer);
-
- if (log.isDebugEnabled())
- log.debug("Evicted key [key=" + key + ", ver=" + ver + ", near=" + near +
- ", evicted=" + evicted +']');
-
- if (!evicted)
- res.addRejected(key);
- }
-
- sendEvictionResponse(nodeId, res);
- }
-
- /**
* @param nodeId Node ID.
* @param res Response.
*/
@@ -659,49 +314,6 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
}
/**
- * @param key Key to evict.
- * @param ver Entry version on initial node.
- * @param near {@code true} if entry should be evicted from near cache.
- * @param obsoleteVer Obsolete version.
- * @return {@code true} if evicted successfully, {@code false} if could not be evicted.
- */
- private boolean evictLocally(KeyCacheObject key,
- final GridCacheVersion ver,
- boolean near,
- GridCacheVersion obsoleteVer)
- {
- assert key != null;
- assert ver != null;
- assert obsoleteVer != null;
- assert evictSyncAgr;
- assert !cctx.isNear() || cctx.isReplicated();
-
- if (log.isDebugEnabled())
- log.debug("Evicting key locally [key=" + key + ", ver=" + ver + ", obsoleteVer=" + obsoleteVer +
- ", localNode=" + cctx.localNode() + ']');
-
- GridCacheAdapter cache = near ? cctx.dht().near() : cctx.cache();
-
- GridCacheEntryEx entry = cache.peekEx(key);
-
- if (entry == null)
- return true;
-
- try {
- // If entry should be evicted from near cache it can be done safely
- // without any consistency risks. We don't use filter in this case.
- // If entry should be evicted from DHT cache, we do not compare versions
- // as well because versions may change outside the transaction.
- return evict0(cache, entry, obsoleteVer, null, false);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to evict entry on remote node [key=" + key + ", localNode=" + cctx.nodeId() + ']', e);
-
- return false;
- }
- }
-
- /**
* @param cache Cache from which to evict entry.
* @param entry Entry to evict.
* @param obsoleteVer Obsolete version.
@@ -762,9 +374,6 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
if (!loc) {
if (cctx.isNear())
return;
-
- if (evictSync)
- return;
}
GridCacheEntryEx e = txEntry.cached();
@@ -781,13 +390,10 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
}
notifyPolicy(e);
-
- if (evictSyncAgr)
- waitForEvictionFutures();
}
/** {@inheritDoc} */
- @Override public void touch(GridCacheEntryEx e, AffinityTopologyVersion topVer) {
+ @Override public void touch(GridCacheEntryEx e, AffinityTopologyVersion topVer) {
if (e.detached() || e.isInternal())
return;
@@ -802,18 +408,10 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
if (!plcEnabled)
return;
- // Don't track non-primary entries if evicts are synchronized.
- if (!cctx.isNear() && evictSync && !cctx.affinity().primaryByPartition(cctx.localNode(), e.partition(), topVer))
- return;
-
if (!enterBusy())
return;
try {
- // Wait for futures to finish.
- if (evictSyncAgr)
- waitForEvictionFutures();
-
if (log.isDebugEnabled())
log.debug("Touching entry [entry=" + e + ", localNode=" + cctx.nodeId() + ']');
@@ -825,39 +423,6 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
}
/**
- * @param e Entry for eviction policy notification.
- */
- private void touch0(GridCacheEntryEx e) {
- assert evictSyncAgr;
- assert plcEnabled;
-
- // Do not wait for futures here since only limited number
- // of entries can be passed to this method.
- notifyPolicy(e);
- }
-
- /**
- * @param entries Entries for eviction policy notification.
- */
- private void touchOnTopologyChange(Iterable<? extends GridCacheEntryEx> entries) {
- assert evictSync;
- assert plcEnabled;
-
- if (log.isDebugEnabled())
- log.debug("Touching entries [entries=" + entries + ", localNode=" + cctx.nodeId() + ']');
-
- for (GridCacheEntryEx e : entries) {
- if (e.key() instanceof GridCacheInternal)
- // Skip internal entry.
- continue;
-
- // Do not wait for futures here since only limited number
- // of entries can be passed to this method.
- notifyPolicy(e);
- }
- }
-
- /**
* Warns on first eviction.
*/
private void warnFirstEvict() {
@@ -870,16 +435,11 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
}
U.warn(log, "Evictions started (cache may have reached its capacity)." +
- " You may wish to increase 'maxSize' on eviction policy being used for cache: " + cctx.name(),
+ " You may wish to increase 'maxSize' on eviction policy being used for cache: " + cctx.name(),
"Evictions started (cache may have reached its capacity): " + cctx.name());
}
/** {@inheritDoc} */
- @Override public boolean evictSyncOrNearSync() {
- return evictSyncAgr;
- }
-
- /** {@inheritDoc} */
@Override public boolean evict(@Nullable GridCacheEntryEx entry, @Nullable GridCacheVersion obsoleteVer,
boolean explicit, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
if (entry == null)
@@ -892,50 +452,18 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
if (!cctx.isNear() && !explicit && !firstEvictWarn)
warnFirstEvict();
- if (evictSyncAgr) {
- assert !cctx.isNear(); // Make sure cache is not NEAR.
-
- if (cctx.affinity().backupsByKey(
- entry.key(),
- cctx.topology().topologyVersion()).contains(cctx.localNode()) &&
- evictSync)
- // Do not track backups if evicts are synchronized.
- return !explicit;
-
- try {
- if (!cctx.isAll(entry, filter))
- return false;
-
- if (entry.lockedByAny())
- return false;
-
- // Add entry to eviction queue.
- enqueue(entry, filter);
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Entry got removed while evicting [entry=" + entry +
- ", localNode=" + cctx.nodeId() + ']');
- }
- }
- else {
- if (obsoleteVer == null)
- obsoleteVer = cctx.versions().next();
-
- // Do not touch entry if not evicted:
- // 1. If it is call from policy, policy tracks it on its own.
- // 2. If it is explicit call, entry is touched on tx commit.
- return evict0(cctx.cache(), entry, obsoleteVer, filter, explicit);
- }
+ if (obsoleteVer == null)
+ obsoleteVer = cctx.versions().next();
- return true;
+ // Do not touch entry if not evicted:
+ // 1. If it is call from policy, policy tracks it on its own.
+ // 2. If it is explicit call, entry is touched on tx commit.
+ return evict0(cctx.cache(), entry, obsoleteVer, filter, explicit);
}
/** {@inheritDoc} */
@Override public void batchEvict(Collection<?> keys, @Nullable GridCacheVersion obsoleteVer)
throws IgniteCheckedException {
- assert !evictSyncAgr;
-
List<GridCacheEntryEx> locked = new ArrayList<>(keys.size());
Set<GridCacheEntryEx> notRmv = null;
@@ -1005,7 +533,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
}
finally {
// Unlock entries in reverse order.
- for (ListIterator<GridCacheEntryEx> it = locked.listIterator(locked.size()); it.hasPrevious();) {
+ for (ListIterator<GridCacheEntryEx> it = locked.listIterator(locked.size()); it.hasPrevious(); ) {
GridCacheEntryEx e = it.previous();
GridUnsafe.monitorExit(e);
@@ -1031,56 +559,6 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
}
/**
- * Enqueues entry for synchronized eviction.
- *
- * @param entry Entry.
- * @param filter Filter.
- * @throws GridCacheEntryRemovedException If entry got removed.
- */
- private void enqueue(GridCacheEntryEx entry, CacheEntryPredicate[] filter)
- throws GridCacheEntryRemovedException {
- Node<EvictionInfo> node = entry.meta(META_KEY);
-
- if (node == null) {
- node = bufEvictQ.addLastx(new EvictionInfo(entry, entry.version(), filter));
-
- if (entry.putMetaIfAbsent(META_KEY, node) != null)
- // Was concurrently added, need to clear it from queue.
- bufEvictQ.unlinkx(node);
- else if (log.isDebugEnabled())
- log.debug("Added entry to eviction queue: " + entry);
- }
- }
-
- /**
- * Checks eviction queue.
- */
- private void checkEvictionQueue() {
- int maxSize = maxQueueSize();
-
- int bufSize = bufEvictQ.sizex();
-
- if (bufSize >= maxSize) {
- if (log.isDebugEnabled())
- log.debug("Processing eviction queue: " + bufSize);
-
- Collection<EvictionInfo> evictInfos = new ArrayList<>(bufSize);
-
- for (int i = 0; i < bufSize; i++) {
- EvictionInfo info = bufEvictQ.poll();
-
- if (info == null)
- break;
-
- evictInfos.add(info);
- }
-
- if (!evictInfos.isEmpty())
- addToCurrentFuture(evictInfos);
- }
- }
-
- /**
* @return Max queue size.
*/
private int maxQueueSize() {
@@ -1093,271 +571,29 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
}
/**
- * Processes eviction queue (sends required requests, etc.).
- *
- * @param evictInfos Eviction information to create future with.
+ * @param info Eviction info.
+ * @return Version aware filter.
*/
- private void addToCurrentFuture(Collection<EvictionInfo> evictInfos) {
- assert !evictInfos.isEmpty();
-
- while (true) {
- EvictionFuture fut = curEvictFut.get();
-
- if (fut == null) {
- curEvictFut.compareAndSet(null, new EvictionFuture());
-
- continue;
- }
-
- if (fut.prepareLock()) {
- boolean added;
-
- try {
- added = fut.add(evictInfos);
- }
- finally {
- fut.prepareUnlock();
- }
+ private CacheEntryPredicate[] versionFilter(final EvictionInfo info) {
+ // If version has changed since we started the whole process
+ // then we should not evict entry.
+ return new CacheEntryPredicate[] {
+ new CacheEntryPredicateAdapter() {
+ @Override public boolean apply(GridCacheEntryEx e) {
+ try {
+ GridCacheVersion ver = e.version();
- if (added) {
- if (fut.prepare()) {
- // Thread that prepares future should remove it and install listener.
- curEvictFut.compareAndSet(fut, null);
-
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- if (!enterBusy()) {
- if (log.isDebugEnabled())
- log.debug("Will not notify eviction future completion (grid is stopping): " +
- f);
-
- return;
- }
-
- try {
- AffinityTopologyVersion topVer = lockTopology();
-
- try {
- onFutureCompleted((EvictionFuture)f, topVer);
- }
- finally {
- unlockTopology();
- }
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- });
+ return info.version().equals(ver) && F.isAll(info.filter());
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ return false;
}
-
- break;
}
- else
- // Infos were not added, create another future for next iteration.
- curEvictFut.compareAndSet(fut, new EvictionFuture());
- }
- else
- // Future has not been locked, create another future for next iteration.
- curEvictFut.compareAndSet(fut, new EvictionFuture());
- }
+ }};
}
/**
- * @param fut Completed eviction future.
- * @param topVer Topology version on future complete.
- */
- private void onFutureCompleted(EvictionFuture fut, AffinityTopologyVersion topVer) {
- if (!enterBusy())
- return;
-
- try {
- IgniteBiTuple<Collection<EvictionInfo>, Collection<EvictionInfo>> t;
-
- try {
- t = fut.get();
- }
- catch (IgniteFutureCancelledCheckedException ignored) {
- assert false : "Future has been cancelled, but manager is not stopping: " + fut;
-
- return;
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Eviction future finished with error (all entries will be touched): " + fut, e);
-
- if (plcEnabled) {
- for (EvictionInfo info : fut.entries())
- touch0(info.entry());
- }
-
- return;
- }
-
- // Check if topology version is different.
- if (!fut.topologyVersion().equals(topVer)) {
- if (log.isDebugEnabled())
- log.debug("Topology has changed, all entries will be touched: " + fut);
-
- if (plcEnabled) {
- for (EvictionInfo info : fut.entries())
- touch0(info.entry());
- }
-
- return;
- }
-
- // Evict remotely evicted entries.
- GridCacheVersion obsoleteVer = null;
-
- Collection<EvictionInfo> evictedEntries = t.get1();
-
- for (EvictionInfo info : evictedEntries) {
- GridCacheEntryEx entry = info.entry();
-
- try {
- // Remove readers on which the entry was evicted.
- for (IgniteBiTuple<ClusterNode, Long> r : fut.evictedReaders(entry.key())) {
- UUID readerId = r.get1().id();
- Long msgId = r.get2();
-
- ((GridDhtCacheEntry)entry).removeReader(readerId, msgId);
- }
-
- if (obsoleteVer == null)
- obsoleteVer = cctx.versions().next();
-
- // Do not touch primary entries, if not evicted.
- // They will be touched within updating transactions.
- evict0(cctx.cache(), entry, obsoleteVer, versionFilter(info), false);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to evict entry [entry=" + entry +
- ", localNode=" + cctx.nodeId() + ']', e);
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Entry was concurrently removed while evicting [entry=" + entry +
- ", localNode=" + cctx.nodeId() + ']');
- }
- }
-
- Collection<EvictionInfo> rejectedEntries = t.get2();
-
- // Touch remotely rejected entries (only if policy is enabled).
- if (plcEnabled && !rejectedEntries.isEmpty()) {
- for (EvictionInfo info : rejectedEntries)
- touch0(info.entry());
- }
- }
- finally {
- busyLock.leaveBusy();
-
- // Signal on future completion.
- signal();
- }
- }
-
- /**
- * This method should be called when eviction future is processed
- * and unwind may continue.
- */
- private void signal() {
- futsCntLock.lock();
-
- try {
- // Avoid volatile read on assertion.
- int cnt = --activeFutsCnt;
-
- assert cnt >= 0 : "Invalid futures count: " + cnt;
-
- if (cnt < maxActiveFuts)
- futsCntCond.signalAll();
- }
- finally {
- futsCntLock.unlock();
- }
- }
-
- /**
- * @param info Eviction info.
- * @return Version aware filter.
- */
- private CacheEntryPredicate[] versionFilter(final EvictionInfo info) {
- // If version has changed since we started the whole process
- // then we should not evict entry.
- return new CacheEntryPredicate[]{new CacheEntryPredicateAdapter() {
- @Override public boolean apply(GridCacheEntryEx e) {
- try {
- GridCacheVersion ver = e.version();
-
- return info.version().equals(ver) && F.isAll(info.filter());
- }
- catch (GridCacheEntryRemovedException ignored) {
- return false;
- }
- }
- }};
- }
-
- /**
- * Gets a collection of nodes to send eviction requests to.
- *
- *
- * @param entry Entry.
- * @param topVer Topology version.
- * @return Tuple of two collections: dht (in case of partitioned cache) nodes
- * and readers (empty for replicated cache).
- * @throws GridCacheEntryRemovedException If entry got removed during method
- * execution.
- */
- @SuppressWarnings( {"IfMayBeConditional"})
- private IgniteBiTuple<Collection<ClusterNode>, Collection<ClusterNode>> remoteNodes(GridCacheEntryEx entry,
- AffinityTopologyVersion topVer)
- throws GridCacheEntryRemovedException {
- assert entry != null;
-
- assert cctx.config().getCacheMode() != LOCAL;
-
- Collection<ClusterNode> backups;
-
- if (evictSync)
- backups = F.view(cctx.dht().topology().nodes(entry.partition(), topVer), F0.notEqualTo(cctx.localNode()));
- else
- backups = Collections.emptySet();
-
- Collection<ClusterNode> readers;
-
- if (nearSync) {
- readers = F.transform(((GridDhtCacheEntry)entry).readers(), new C1<UUID, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(UUID nodeId) {
- return cctx.node(nodeId);
- }
- });
- }
- else
- readers = Collections.emptySet();
-
- return new IgnitePair<>(backups, readers);
- }
-
- /** {@inheritDoc} */
- @Override public void unwind() {
- if (!evictSyncAgr)
- return;
-
- if (!enterBusy())
- return;
-
- try {
- checkEvictionQueue();
- }
- finally {
- busyLock.leaveBusy();
- }
- }
-
- /**
- * @param e Entry to notify eviction policy.
+ * @param e Entry to notify eviction policy.
*/
@SuppressWarnings({"IfMayBeConditional", "RedundantIfStatement"})
private void notifyPolicy(GridCacheEntryEx e) {
@@ -1373,40 +609,11 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
}
/**
- *
- */
- @SuppressWarnings("TooBroadScope")
- private void waitForEvictionFutures() {
- if (activeFutsCnt >= maxActiveFuts) {
- boolean interrupted = false;
-
- futsCntLock.lock();
-
- try {
- while(!stopping && activeFutsCnt >= maxActiveFuts) {
- try {
- futsCntCond.await(2000, MILLISECONDS);
- }
- catch (InterruptedException ignored) {
- interrupted = true;
- }
- }
- }
- finally {
- futsCntLock.unlock();
-
- if (interrupted)
- Thread.currentThread().interrupt();
- }
- }
- }
-
- /**
* Prints out eviction stats.
*/
public void printStats() {
X.println("Eviction stats [igniteInstanceName=" + cctx.igniteInstanceName() +
- ", cache=" + cctx.cache().name() + ", buffEvictQ=" + bufEvictQ.sizex() + ']');
+ ", cache=" + cctx.cache().name() + ']');
}
/** {@inheritDoc} */
@@ -1414,99 +621,6 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
X.println(">>> ");
X.println(">>> Eviction manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() +
", cache=" + cctx.name() + ']');
- X.println(">>> buffEvictQ size: " + bufEvictQ.sizex());
- X.println(">>> futsSize: " + futs.size());
- X.println(">>> futsCreated: " + idGen.get());
- }
-
- /**
- *
- */
- private class BackupWorker extends GridWorker {
- /** */
- private final BlockingQueue<DiscoveryEvent> evts = new LinkedBlockingQueue<>();
-
- /** */
- private final Collection<Integer> primaryParts = new HashSet<>();
-
- /**
- *
- */
- private BackupWorker() {
- super(cctx.igniteInstanceName(), "cache-eviction-backup-worker", GridCacheEvictionManager.this.log);
-
- assert plcEnabled;
- }
-
- /**
- * @param evt New event.
- */
- void addEvent(DiscoveryEvent evt) {
- assert evt != null;
-
- evts.add(evt);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- try {
- assert !cctx.isNear() && evictSync;
-
- ClusterNode loc = cctx.localNode();
-
- AffinityTopologyVersion initTopVer =
- new AffinityTopologyVersion(cctx.discovery().localJoinEvent().topologyVersion(), 0);
-
- AffinityTopologyVersion cacheStartVer = cctx.startTopologyVersion();
-
- if (cacheStartVer != null && cacheStartVer.compareTo(initTopVer) > 0)
- initTopVer = cacheStartVer;
-
- // Initialize.
- primaryParts.addAll(cctx.affinity().primaryPartitions(cctx.localNodeId(), initTopVer));
-
- while (!isCancelled()) {
- DiscoveryEvent evt = evts.take();
-
- if (log.isDebugEnabled())
- log.debug("Processing event: " + evt);
-
- AffinityTopologyVersion topVer = new AffinityTopologyVersion(evt.topologyVersion());
-
- // Remove partitions that are no longer primary.
- for (Iterator<Integer> it = primaryParts.iterator(); it.hasNext();) {
- if (!evts.isEmpty())
- break;
-
- if (!cctx.affinity().primaryByPartition(loc, it.next(), topVer))
- it.remove();
- }
-
- // Move on to next event.
- if (!evts.isEmpty())
- continue;
-
- for (GridDhtLocalPartition part : cctx.topology().localPartitions()) {
- if (!evts.isEmpty())
- break;
-
- if (part.primary(topVer) && primaryParts.add(part.id())) {
- if (log.isDebugEnabled())
- log.debug("Touching partition entries: " + part);
-
- touchOnTopologyChange(part.allEntries());
- }
- }
- }
- }
- catch (InterruptedException ignored) {
- // No-op.
- }
- catch (IgniteException e) {
- if (!e.hasCause(InterruptedException.class))
- throw e;
- }
- }
}
/**
@@ -1563,540 +677,4 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements
return S.toString(EvictionInfo.class, this);
}
}
-
- /**
- * Future for synchronized eviction. Result is a tuple: {evicted entries, rejected entries}.
- */
- private class EvictionFuture extends GridFutureAdapter<IgniteBiTuple<Collection<EvictionInfo>,
- Collection<EvictionInfo>>> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final long id = idGen.incrementAndGet();
-
- /** */
- private ConcurrentLinkedDeque8<EvictionInfo> evictInfos = new ConcurrentLinkedDeque8<>();
-
- /** */
- private final ConcurrentMap<KeyCacheObject, EvictionInfo> entries = new ConcurrentHashMap8<>();
-
- /** */
- private final ConcurrentMap<KeyCacheObject, Collection<ClusterNode>> readers =
- new ConcurrentHashMap8<>();
-
- /** */
- private final Collection<EvictionInfo> evictedEntries = new GridConcurrentHashSet<>();
-
- /** */
- private final ConcurrentMap<KeyCacheObject, EvictionInfo> rejectedEntries = new ConcurrentHashMap8<>();
-
- /** Request map. */
- private final ConcurrentMap<UUID, GridCacheEvictionRequest> reqMap =
- new ConcurrentHashMap8<>();
-
- /** Response map. */
- private final ConcurrentMap<UUID, GridCacheEvictionResponse> resMap =
- new ConcurrentHashMap8<>();
-
- /** To make sure that future is completing within a single thread. */
- private final AtomicBoolean finishPrepare = new AtomicBoolean();
-
- /** Lock to use when future is being initialized. */
- @GridToStringExclude
- private final ReadWriteLock prepareLock = new ReentrantReadWriteLock();
-
- /** To make sure that future is completing within a single thread. */
- private final AtomicBoolean completing = new AtomicBoolean();
-
- /** Lock to use after future is initialized. */
- @GridToStringExclude
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- /** Object to force future completion on elapsing eviction timeout. */
- @GridToStringExclude
- private GridTimeoutObject timeoutObj;
-
- /** Topology version future is processed on. */
- private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
-
- /**
- * @return {@code True} if prepare lock was acquired.
- */
- boolean prepareLock() {
- return prepareLock.readLock().tryLock();
- }
-
- /**
- *
- */
- void prepareUnlock() {
- prepareLock.readLock().unlock();
- }
-
- /**
- * @param infos Infos to add.
- * @return {@code False} if entries were not added due to capacity restrictions.
- */
- boolean add(Collection<EvictionInfo> infos) {
- assert infos != null && !infos.isEmpty();
-
- if (evictInfos.sizex() > maxQueueSize())
- return false;
-
- evictInfos.addAll(infos);
-
- return true;
- }
-
- /**
- * @return {@code True} if future has been prepared by this call.
- */
- @SuppressWarnings("LockAcquiredButNotSafelyReleased")
- boolean prepare() {
- if (evictInfos.sizex() >= maxQueueSize() && finishPrepare.compareAndSet(false, true)) {
- // Lock will never be released intentionally.
- prepareLock.writeLock().lock();
-
- futsCntLock.lock();
-
- try {
- activeFutsCnt++;
- }
- finally {
- futsCntLock.unlock();
- }
-
- // Put future in map.
- futs.put(id, this);
-
- prepare0();
-
- return true;
- }
-
- return false;
- }
-
- /**
- * Prepares future (sends all required requests).
- */
- private void prepare0() {
- if (log.isDebugEnabled())
- log.debug("Preparing eviction future [futId=" + id + ", localNode=" + cctx.nodeId() +
- ", infos=" + evictInfos + ']');
-
- assert evictInfos != null && !evictInfos.isEmpty();
-
- topVer = lockTopology();
-
- try {
- Collection<EvictionInfo> locals = null;
-
- for (EvictionInfo info : evictInfos) {
- // Queue node may have been stored in entry metadata concurrently, but we don't care
- // about it since we are currently processing this entry.
- Node<EvictionInfo> queueNode = info.entry().removeMeta(META_KEY);
-
- if (queueNode != null)
- bufEvictQ.unlinkx(queueNode);
-
- IgniteBiTuple<Collection<ClusterNode>, Collection<ClusterNode>> tup;
-
- try {
- tup = remoteNodes(info.entry(), topVer);
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Entry got removed while preparing eviction future (ignoring) [entry=" +
- info.entry() + ", nodeId=" + cctx.nodeId() + ']');
-
- continue;
- }
-
- Collection<ClusterNode> entryReaders =
- F.addIfAbsent(readers, info.entry().key(), new GridConcurrentHashSet<ClusterNode>());
-
- assert entryReaders != null;
-
- // Add entry readers so that we could remove them right before local eviction.
- entryReaders.addAll(tup.get2());
-
- Collection<ClusterNode> nodes = F.concat(true, tup.get1(), tup.get2());
-
- if (!nodes.isEmpty()) {
- entries.put(info.entry().key(), info);
-
- // There are remote participants.
- for (ClusterNode node : nodes) {
- GridCacheEvictionRequest req = F.addIfAbsent(reqMap, node.id(),
- new GridCacheEvictionRequest(cctx.cacheId(), id, evictInfos.size(), topVer,
- cctx.deploymentEnabled()));
-
- assert req != null;
-
- req.addKey(info.entry().key(), info.version(), entryReaders.contains(node));
- }
- }
- else {
- if (locals == null)
- locals = new HashSet<>(evictInfos.size(), 1.0f);
-
- // There are no remote participants, need to keep the entry as local.
- locals.add(info);
- }
- }
-
- if (locals != null) {
- // Evict entries without remote participant nodes immediately.
- GridCacheVersion obsoleteVer = cctx.versions().next();
-
- for (EvictionInfo info : locals) {
- if (log.isDebugEnabled())
- log.debug("Evicting key without remote participant nodes: " + info);
-
- try {
- // Touch primary entry (without backup nodes) if not evicted
- // to keep tracking.
- if (!evict0(cctx.cache(), info.entry(), obsoleteVer, versionFilter(info), false) &&
- plcEnabled)
- touch0(info.entry());
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to evict entry: " + info.entry(), e);
- }
- }
- }
-
- // If there were only local entries.
- if (entries.isEmpty()) {
- complete(false);
-
- return;
- }
- }
- finally {
- unlockTopology();
- }
-
- // Send eviction requests.
- for (Map.Entry<UUID, GridCacheEvictionRequest> e : reqMap.entrySet()) {
- UUID nodeId = e.getKey();
-
- GridCacheEvictionRequest req = e.getValue();
-
- if (log.isDebugEnabled())
- log.debug("Sending eviction request [node=" + nodeId + ", req=" + req + ']');
-
- try {
- cctx.io().send(nodeId, req, cctx.ioPolicy());
- }
- catch (ClusterTopologyCheckedException ignored) {
- // Node left the topology.
- onNodeLeft(nodeId);
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to send eviction request to node [node=" + nodeId + ", req=" + req + ']', ex);
-
- rejectEntries(nodeId);
- }
- }
-
- registerTimeoutObject();
- }
-
- /**
- *
- */
- private void registerTimeoutObject() {
- // Check whether future has not been completed yet.
- if (lock.readLock().tryLock()) {
- try {
- timeoutObj = new GridTimeoutObjectAdapter(cctx.config().getEvictSynchronizedTimeout()) {
- @Override public void onTimeout() {
- complete(true);
- }
- };
-
- cctx.time().addTimeoutObject(timeoutObj);
- }
- finally {
- lock.readLock().unlock();
- }
- }
- }
-
- /**
- * @return Future ID.
- */
- long id() {
- return id;
- }
-
- /**
- * @return Topology version.
- */
- AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @return Keys to readers mapping.
- */
- Map<KeyCacheObject, Collection<ClusterNode>> readers() {
- return readers;
- }
-
- /**
- * @return All entries associated with future that should be evicted (or rejected).
- */
- Collection<EvictionInfo> entries() {
- return entries.values();
- }
-
- /**
- * Reject all entries on behalf of specified node.
- *
- * @param nodeId Node ID.
- */
- private void rejectEntries(UUID nodeId) {
- assert nodeId != null;
-
- if (lock.readLock().tryLock()) {
- try {
- if (log.isDebugEnabled())
- log.debug("Rejecting entries for node: " + nodeId);
-
- GridCacheEvictionRequest req = reqMap.remove(nodeId);
-
- for (CacheEvictionEntry t : req.entries()) {
- EvictionInfo info = entries.get(t.key());
-
- assert info != null;
-
- rejectedEntries.put(t.key(), info);
- }
- }
- finally {
- lock.readLock().unlock();
- }
- }
-
- checkDone();
- }
-
- /**
- * @param nodeId Node id that left the topology.
- */
- void onNodeLeft(UUID nodeId) {
- assert nodeId != null;
-
- if (lock.readLock().tryLock()) {
- try {
- // Stop waiting response from this node.
- reqMap.remove(nodeId);
-
- resMap.remove(nodeId);
- }
- finally {
- lock.readLock().unlock();
- }
-
- checkDone();
- }
- }
-
- /**
- * @param nodeId Sender node ID.
- * @param res Response.
- */
- void onResponse(UUID nodeId, GridCacheEvictionResponse res) {
- assert nodeId != null;
- assert res != null;
-
- if (lock.readLock().tryLock()) {
- try {
- if (log.isDebugEnabled())
- log.debug("Entered to eviction future onResponse() [fut=" + this + ", node=" + nodeId +
- ", res=" + res + ']');
-
- ClusterNode node = cctx.node(nodeId);
-
- if (node != null)
- resMap.put(nodeId, res);
- else
- // Sender node left grid.
- reqMap.remove(nodeId);
- }
- finally {
- lock.readLock().unlock();
- }
-
- if (res.evictError())
- // Complete future, since there was a class loading error on at least one node.
- complete(false);
- else
- checkDone();
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Ignored eviction response [fut=" + this + ", node=" + nodeId + ", res=" + res + ']');
- }
- }
-
- /**
- *
- */
- private void checkDone() {
- if (reqMap.isEmpty() || resMap.keySet().containsAll(reqMap.keySet()))
- complete(false);
- }
-
- /**
- * Completes future.
- *
- * @param timedOut {@code True} if future is being forcibly completed on timeout.
- */
- @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
- private void complete(boolean timedOut) {
- if (completing.compareAndSet(false, true)) {
- // Lock will never be released intentionally.
- lock.writeLock().lock();
-
- futs.remove(id);
-
- if (timeoutObj != null && !timedOut)
- // If future is timed out, corresponding object is already removed.
- cctx.time().removeTimeoutObject(timeoutObj);
-
- if (log.isDebugEnabled())
- log.debug("Building eviction future result [fut=" + this + ", timedOut=" + timedOut + ']');
-
- boolean err = false;
-
- for (GridCacheEvictionResponse res : resMap.values()) {
- if (res.evictError()) {
- err = true;
-
- break;
- }
- }
-
- if (err) {
- Collection<UUID> ids = F.view(resMap.keySet(), new P1<UUID>() {
- @Override public boolean apply(UUID e) {
- return resMap.get(e).evictError();
- }
- });
-
- assert !ids.isEmpty();
-
- U.warn(log, "Remote node(s) failed to process eviction request " +
- "due to topology changes " +
- "(some backup or remote values maybe lost): " + ids);
- }
-
- if (timedOut)
- U.warn(log, "Timed out waiting for eviction future " +
- "(consider changing 'evictSynchronousTimeout' and 'evictSynchronousConcurrencyLevel' " +
- "configuration parameters).");
-
- if (err || timedOut) {
- // Future has not been completed successfully, all entries should be rejected.
- assert evictedEntries.isEmpty();
-
- rejectedEntries.putAll(entries);
- }
- else {
- // Copy map to filter remotely rejected entries,
- // as they will be touched within corresponding txs.
- Map<KeyCacheObject, EvictionInfo> rejectedEntries0 = new HashMap<>(rejectedEntries);
-
- // Future has been completed successfully - build result.
- for (EvictionInfo info : entries.values()) {
- KeyCacheObject key = info.entry().key();
-
- if (rejectedEntries0.containsKey(key))
- // Was already rejected.
- continue;
-
- boolean rejected = false;
-
- for (GridCacheEvictionResponse res : resMap.values()) {
- if (res.rejectedKeys().contains(key)) {
- // Modify copied map.
- rejectedEntries0.put(key, info);
-
- rejected = true;
-
- break;
- }
- }
-
- if (!rejected)
- evictedEntries.add(info);
- }
- }
-
- // Pass entries that were rejected due to topology changes
- // or due to timeout or class loading issues.
- // Remotely rejected entries will be touched within corresponding txs.
- onDone(F.t(evictedEntries, rejectedEntries.values()));
- }
- }
-
- /**
- * @param key Key.
- * @return Reader nodes on which given key was evicted.
- */
- Collection<IgniteBiTuple<ClusterNode, Long>> evictedReaders(KeyCacheObject key) {
- Collection<ClusterNode> mappedReaders = readers.get(key);
-
- if (mappedReaders == null)
- return Collections.emptyList();
-
- Collection<IgniteBiTuple<ClusterNode, Long>> col = new LinkedList<>();
-
- for (Map.Entry<UUID, GridCacheEvictionResponse> e : resMap.entrySet()) {
- ClusterNode node = cctx.node(e.getKey());
-
- // If node has left or response did not arrive from near node
- // then just skip it.
- if (node == null || !mappedReaders.contains(node))
- continue;
-
- GridCacheEvictionResponse res = e.getValue();
-
- if (!res.rejectedKeys().contains(key))
- col.add(F.t(node, res.messageId()));
- }
-
- return col;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("LockAcquiredButNotSafelyReleased")
- @Override public boolean cancel() {
- if (completing.compareAndSet(false, true)) {
- // Lock will never be released intentionally.
- lock.writeLock().lock();
-
- if (timeoutObj != null)
- cctx.time().removeTimeoutObject(timeoutObj);
-
- boolean b = onCancelled();
-
- assert b;
-
- return true;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(EvictionFuture.class, this);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/65d816f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 022117b..c82e864 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -920,14 +920,6 @@ public class GridCacheUtils {
public static void unwindEvicts(GridCacheContext ctx) {
assert ctx != null;
- ctx.evicts().unwind();
-
- if (ctx.isNear()) {
- GridCacheContext dhtCtx = ctx.near().dht().context();
-
- dhtCtx.evicts().unwind();
- }
-
ctx.ttl().expire();
}