You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/13 12:00:54 UTC
[3/3] ignite git commit: 1093
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eae26a87
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eae26a87
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eae26a87
Branch: refs/heads/ignite-1093-2
Commit: eae26a874e35e1ee817cef4c7a509a326b9fde4e
Parents: 51249dc
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Oct 13 13:00:34 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Oct 13 13:00:34 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCachePreloader.java | 9 ++-
.../cache/GridCachePreloaderAdapter.java | 6 ++
.../distributed/dht/GridDhtLocalPartition.java | 73 +++++---------------
.../dht/preloader/GridDhtPreloader.java | 46 +++++++++++-
4 files changed, 77 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/eae26a87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index b2c2a2b..08aec71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -19,10 +19,10 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import java.util.UUID;
-import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
@@ -158,4 +158,11 @@ public interface GridCachePreloader {
* @param d Demand message.
*/
public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d);
+
+ /**
+ * Evicts partition asynchronously.
+ *
+ * @param part Partition.
+ */
+ public void evictPartitionAsync(GridDhtLocalPartition part);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eae26a87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 6ffb2bf..be616f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -25,6 +25,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
@@ -162,4 +163,9 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
Collection<String> caches, int cnt) throws IgniteCheckedException {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
+ // No-op.
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eae26a87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 15eede0..5bb9930 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -50,13 +49,11 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
import org.jsr166.LongAdder8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
@@ -80,12 +77,6 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
/** Logger. */
private static volatile IgniteLogger log;
- /** */
- private static final Queue<GridDhtLocalPartition> partitionsToEvict = new ConcurrentLinkedDeque8<>();
-
- /** */
- private static final AtomicReference<Integer> partitionsEvictionOwning = new AtomicReference<>(0);
-
/** Partition ID. */
private final int id;
@@ -292,7 +283,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
}
// Attempt to evict.
- tryEvict();
+ cctx.preloader().evictPartitionAsync(this);
}
/**
@@ -417,7 +408,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
// Decrement reservations.
if (state.compareAndSet(s, s, reservations, --reservations)) {
- tryEvict();
+ cctx.preloader().evictPartitionAsync(this);
break;
}
@@ -505,7 +496,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
clearDeferredDeletes();
}
else {
- tryEvict();
+ cctx.preloader().evictPartitionAsync(this);
}
}
@@ -524,58 +515,30 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
/**
* @return {@code True} if entry has been transitioned to state EVICTED.
*/
- void tryEvict() {
+ public void tryEvict() {
if (state.getReference() != RENTING || state.getStamp() != 0 || groupReserved())
return;
- partitionsToEvict.add(this);
-
- if (partitionsEvictionOwning.get() == 0) {
- cctx.closures().callLocalSafe(new GPC<Boolean>() {
- @Override public Boolean call() {
- while (true) {
- if (!partitionsEvictionOwning.compareAndSet(0, 1))
- return false;
-
- try {
- GridDhtLocalPartition part = partitionsToEvict.poll();
-
- if (part == null) {
- return false;
- }
-
- if (part.state.getReference() != EVICTED) {
- // Attempt to evict partition entries from cache.
- part.clearAll();
-
- if (part.map.isEmpty() && part.state.compareAndSet(RENTING, EVICTED, 0, 0)) {
- if (log.isDebugEnabled())
- log.debug("Evicted partition: " + this);
+ // Attempt to evict partition entries from cache.
+ clearAll();
- if (!GridQueryProcessor.isEnabled(part.cctx.config()))
- part.clearSwap();
+ if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) {
+ if (log.isDebugEnabled())
+ log.debug("Evicted partition: " + this);
- if (part.cctx.isDrEnabled())
- part.cctx.dr().partitionEvicted(id);
+ if (!GridQueryProcessor.isEnabled(cctx.config()))
+ clearSwap();
- part.cctx.dataStructures().onPartitionEvicted(id);
+ if (cctx.isDrEnabled())
+ cctx.dr().partitionEvicted(id);
- part.rent.onDone();
+ cctx.dataStructures().onPartitionEvicted(id);
- ((GridDhtPreloader)part.cctx.preloader()).onPartitionEvicted(part, true);
+ rent.onDone();
- part.clearDeferredDeletes();
- }
- }
- }
- finally {
- boolean res = partitionsEvictionOwning.compareAndSet(1, 0);
+ ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, true);
- assert res;
- }
- }
- }
- }, /*system pool*/ true);
+ clearDeferredDeletes();
}
}
@@ -616,7 +579,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
*
*/
void onUnlock() {
- tryEvict();
+ cctx.preloader().evictPartitionAsync(this);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/eae26a87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 01109c1..c99176e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -21,9 +21,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Queue;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
@@ -53,12 +54,14 @@ import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
@@ -104,6 +107,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** Demand lock. */
private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
+ /** */
+ private final Queue<GridDhtLocalPartition> partitionsToEvict = new ConcurrentLinkedDeque8<>();
+
+ /** */
+ private final AtomicReference<Integer> partitionsEvictionOwning = new AtomicReference<>(0);
+
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -719,6 +728,41 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
forceKeyFuts.remove(fut.futureId(), fut);
}
+ /** {@inheritDoc} */
+ @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
+ partitionsToEvict.add(part);
+
+ if (partitionsEvictionOwning.compareAndSet(0, 1)) {
+ cctx.closures().callLocalSafe(new GPC<Boolean>() {
+ @Override public Boolean call() {
+ boolean firstRun = true;
+
+ while (true) {
+ if (!firstRun && !partitionsEvictionOwning.compareAndSet(0, 1))
+ return false;
+
+ firstRun = false;
+
+ try {
+ GridDhtLocalPartition part = partitionsToEvict.poll();
+
+ if (part == null) {
+ return false;
+ }
+
+ part.tryEvict();
+ }
+ finally {
+ boolean res = partitionsEvictionOwning.compareAndSet(1, 0);
+
+ assert res;
+ }
+ }
+ }
+ }, /*system pool*/ true);
+ }
+ }
+
/**
*
*/