You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/27 10:00:43 UTC
[5/5] ignite git commit: Ignite-1093
Ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a34a408b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a34a408b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a34a408b
Branch: refs/heads/ignite-1093-3
Commit: a34a408bf4736aca2879207037ba3bfe5d87de82
Parents: 1334e77
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Oct 27 12:00:13 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Oct 27 12:00:13 2015 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 50 +-
.../configuration/IgniteConfiguration.java | 30 +-
.../apache/ignite/internal/IgniteKernal.java | 10 +
.../org/apache/ignite/internal/IgnitionEx.java | 3 +
.../communication/GridIoMessageFactory.java | 10 +-
.../processors/cache/GridCacheIoManager.java | 9 +
.../processors/cache/GridCacheMapEntry.java | 38 +-
.../GridCachePartitionExchangeManager.java | 157 ++-
.../processors/cache/GridCachePreloader.java | 43 +-
.../cache/GridCachePreloaderAdapter.java | 35 +-
.../processors/cache/GridCacheProcessor.java | 54 +-
.../distributed/dht/GridDhtCacheEntry.java | 5 +-
.../distributed/dht/GridDhtLocalPartition.java | 60 +-
.../GridDhtPartitionDemandMessage.java | 9 +-
.../preloader/GridDhtPartitionDemandPool.java | 1192 ----------------
.../dht/preloader/GridDhtPartitionDemander.java | 1310 ++++++++++++++++++
.../dht/preloader/GridDhtPartitionSupplier.java | 999 +++++++++++++
.../GridDhtPartitionSupplyMessageV2.java | 404 ++++++
.../preloader/GridDhtPartitionSupplyPool.java | 555 --------
.../dht/preloader/GridDhtPreloader.java | 269 +++-
.../datastructures/DataStructuresProcessor.java | 3 +
.../processors/task/GridTaskWorker.java | 4 +-
.../ignite/internal/util/lang/GridTuple4.java | 2 +-
.../ignite/internal/util/nio/GridNioServer.java | 2 +
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
...eAtomicInvalidPartitionHandlingSelfTest.java | 3 +
.../GridCacheRebalancingAsyncSelfTest.java | 63 +
.../GridCacheRebalancingSyncSelfTest.java | 472 +++++++
.../GridCacheReplicatedPreloadSelfTest.java | 22 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 17 -
.../spi/discovery/tcp/TestTcpDiscoverySpi.java | 46 +
.../testframework/junits/GridAbstractTest.java | 3 +-
.../junits/common/GridCommonAbstractTest.java | 5 +-
.../testsuites/IgniteCacheTestSuite3.java | 5 +
.../tcp/GridOrderedMessageCancelSelfTest.java | 18 +-
35 files changed, 3946 insertions(+), 1963 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 6ac2b64..4012792 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -73,6 +73,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Default rebalance timeout (ms).*/
public static final long DFLT_REBALANCE_TIMEOUT = 10000;
+ /** Default rebalance batches count. */
+ public static final long DFLT_REBALANCE_BATCHES_COUNT = 2;
+
/** Time in milliseconds to wait between rebalance messages to avoid overloading CPU. */
public static final long DFLT_REBALANCE_THROTTLE = 0;
@@ -256,6 +259,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Off-heap memory size. */
private long offHeapMaxMem = DFLT_OFFHEAP_MEMORY;
+ /** Rebalance batches count. */
+ private long rebalanceBatchesCount = DFLT_REBALANCE_BATCHES_COUNT;
+
/** */
private boolean swapEnabled = DFLT_SWAP_ENABLED;
@@ -396,6 +402,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
rebalanceDelay = cc.getRebalanceDelay();
rebalanceOrder = cc.getRebalanceOrder();
rebalancePoolSize = cc.getRebalanceThreadPoolSize();
+ rebalanceBatchesCount = cc.getRebalanceBatchesCount();
rebalanceTimeout = cc.getRebalanceTimeout();
rebalanceThrottle = cc.getRebalanceThrottle();
readFromBackup = cc.isReadFromBackup();
@@ -1033,10 +1040,10 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
* {@link CacheRebalanceMode#SYNC SYNC} or {@link CacheRebalanceMode#ASYNC ASYNC} rebalance modes only.
* <p/>
* If cache rebalance order is positive, rebalancing for this cache will be started only when rebalancing for
- * all caches with smaller rebalance order (except caches with rebalance order {@code 0}) will be completed.
+ * all caches with smaller rebalance order will be completed.
* <p/>
* Note that cache with order {@code 0} does not participate in ordering. This means that cache with
- * rebalance order {@code 1} will never wait for any other caches. All caches with order {@code 0} will
+ * rebalance order {@code 0} will never wait for any other caches. All caches with order {@code 0} will
* be rebalanced right away concurrently with each other and ordered rebalance processes.
* <p/>
* If not set, cache order is 0, i.e. rebalancing is not ordered.
@@ -1079,7 +1086,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
* @return {@code this} for chaining.
*/
public CacheConfiguration<K, V> setRebalanceBatchSize(int rebalanceBatchSize) {
- this.rebalanceBatchSize = rebalanceBatchSize;
+ this.rebalanceBatchSize = Math.max(1, rebalanceBatchSize);
return this;
}
@@ -1269,11 +1276,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
return this;
}
+ @Deprecated
/**
- * Gets size of rebalancing thread pool. Note that size serves as a hint and implementation
- * may create more threads for rebalancing than specified here (but never less threads).
- * <p>
- * Default value is {@link #DFLT_REBALANCE_THREAD_POOL_SIZE}.
+ * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
*
* @return Size of rebalancing thread pool.
*/
@@ -1281,9 +1286,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
return rebalancePoolSize;
}
+ @Deprecated
/**
- * Sets size of rebalancing thread pool. Note that size serves as a hint and implementation may create more threads
- * for rebalancing than specified here (but never less threads).
+ * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
*
* @param rebalancePoolSize Size of rebalancing thread pool.
* @return {@code this} for chaining.
@@ -1773,6 +1778,33 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
}
/**
+ * To gain better rebalancing performance supplier node can provide mode than one batch at start and provide
+ * one new to each next demand request.
+ *
+ * Gets number of batches generated by supply node at rebalancing start.
+ *
+ * @return batches count
+ */
+ public long getRebalanceBatchesCount() {
+ return rebalanceBatchesCount;
+ }
+
+ /**
+ * To gain better rebalancing performance supplier node can provide mode than one batch at start and provide
+ * one new to each next demand request.
+ *
+ * Sets number of batches generated by supply node at rebalancing start.
+ *
+ * @param rebalanceBatchesCnt batches count.
+ * @return {@code this} for chaining.
+ */
+ public CacheConfiguration<K, V> setRebalanceBatchesCount(long rebalanceBatchesCnt) {
+ this.rebalanceBatchesCount = rebalanceBatchesCnt;
+
+ return this;
+ }
+
+ /**
* Gets cache store session listener factories.
*
* @return Cache store session listener factories.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index ecae356..26145e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -149,6 +149,9 @@ public class IgniteConfiguration {
/** Default keep alive time for public thread pool. */
public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
+ /** Default limit of threads used at rebalance. */
+ public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 2;
+
/** Default max queue capacity of public thread pool. */
public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE;
@@ -354,6 +357,9 @@ public class IgniteConfiguration {
/** Client mode flag. */
private Boolean clientMode;
+ /** Rebalance thread pool size. */
+ private int rebalanceThreadPoolSize = DFLT_REBALANCE_THREAD_POOL_SIZE;
+
/** Transactions configuration. */
private TransactionConfiguration txCfg = new TransactionConfiguration();
@@ -518,6 +524,7 @@ public class IgniteConfiguration {
utilityCachePoolSize = cfg.getUtilityCacheThreadPoolSize();
waitForSegOnStart = cfg.isWaitForSegmentOnStart();
warmupClos = cfg.getWarmupClosure();
+ rebalanceThreadPoolSize = cfg.getRebalanceThreadPoolSize();
}
/**
@@ -1331,6 +1338,27 @@ public class IgniteConfiguration {
}
/**
+ * Gets Max count of threads can be used at rebalancing.
+ * Minimum is 1.
+ * @return count.
+ */
+ public int getRebalanceThreadPoolSize(){
+ return Math.max(1, rebalanceThreadPoolSize);
+ }
+
+ /**
+ * Sets Max count of threads can be used at rebalancing.
+ * Minimum is 1.
+ * @param size Size.
+ * @return {@code this} for chaining.
+ */
+ public IgniteConfiguration setRebalanceThreadPoolSize(int size){
+ this.rebalanceThreadPoolSize = size;
+
+ return this;
+ }
+
+ /**
* Returns a collection of life-cycle beans. These beans will be automatically
* notified of grid life-cycle events. Use life-cycle beans whenever you
* want to perform certain logic before and after grid startup and stopping
@@ -2383,4 +2411,4 @@ public class IgniteConfiguration {
@Override public String toString() {
return S.toString(IgniteConfiguration.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c02dc59..da8cf3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -733,6 +733,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ackEnvironmentVariables();
ackCacheConfiguration();
ackP2pConfiguration();
+ ackRebalanceConfiguration();
// Run background network diagnostics.
GridDiagnostic.runBackgroundCheck(gridName, execSvc, log);
@@ -2135,6 +2136,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/**
*
*/
+ private void ackRebalanceConfiguration() throws IgniteCheckedException {
+ if (cfg.getSystemThreadPoolSize() <= cfg.getRebalanceThreadPoolSize())
+ throw new IgniteCheckedException("Rebalance thread pool size exceed or equals System thread pool size. " +
+ "Change IgniteConfiguration.rebalanceThreadPoolSize property before next start.");
+ }
+
+ /**
+ *
+ */
private void ackCacheConfiguration() {
CacheConfiguration[] cacheCfgs = cfg.getCacheConfiguration();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 02b28c5..7d2b2dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -2035,6 +2035,7 @@ public class IgnitionEx {
cache.setAffinity(new RendezvousAffinityFunction(false, 20));
cache.setNodeFilter(CacheConfiguration.ALL_NODES);
cache.setStartSize(300);
+ cache.setRebalanceOrder(-2);//Prior to other system caches.
return cache;
}
@@ -2055,6 +2056,7 @@ public class IgnitionEx {
cache.setWriteSynchronizationMode(FULL_SYNC);
cache.setAffinity(new RendezvousAffinityFunction(false, 100));
cache.setNodeFilter(CacheConfiguration.ALL_NODES);
+ cache.setRebalanceOrder(-1);//Prior to user caches.
return cache;
}
@@ -2075,6 +2077,7 @@ public class IgnitionEx {
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setCacheMode(cfg.getCacheMode());
ccfg.setNodeFilter(CacheConfiguration.ALL_NODES);
+ ccfg.setRebalanceOrder(-1);//Prior to user caches.
if (cfg.getCacheMode() == PARTITIONED)
ccfg.setBackups(cfg.getBackups());
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 079015c..ae8c753 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
@@ -684,7 +685,12 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- // [-3..112] - this
+ case 114:
+ msg = new GridDhtPartitionSupplyMessageV2();
+
+ break;
+
+ // [-3..114] - this
// [120..123] - DR
// [-4..-22] - SQL
default:
@@ -722,4 +728,4 @@ public class GridIoMessageFactory implements MessageFactory {
CUSTOM.put(type, c);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 476a96c..1fe55d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
@@ -483,6 +484,14 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
+ case 114: {
+ GridDhtPartitionSupplyMessageV2 req = (GridDhtPartitionSupplyMessageV2)msg;
+
+ U.error(log, "Supply message v2 cannot be unmarshalled.", req.classError());
+ }
+
+ break;
+
default:
throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
+ msg + "]");
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 4bf0aa1..4e92ed4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -453,7 +453,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (cctx.swap().offheapSwapEvict(key, entry, partition(), evictVer)) {
assert !hasValueUnlocked() : this;
- obsolete = markObsolete0(obsoleteVer, false);
+ obsolete = markObsolete0(obsoleteVer, false, null);
assert obsolete : this;
}
@@ -1303,7 +1303,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
synchronized (this) {
// If entry is still removed.
if (newVer == ver) {
- if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true))) {
+ if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) {
if (log.isDebugEnabled())
log.debug("Entry could not be marked obsolete (it is still used): " + this);
}
@@ -2420,7 +2420,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
if ((!hasReaders() || readers)) {
// markObsolete will clear the value.
- if (!(marked = markObsolete0(ver, true))) {
+ if (!(marked = markObsolete0(ver, true, null))) {
if (log.isDebugEnabled())
log.debug("Entry could not be marked obsolete (it is still used): " + this);
@@ -2478,7 +2478,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean obsolete;
synchronized (this) {
- obsolete = markObsolete0(ver, true);
+ obsolete = markObsolete0(ver, true, null);
}
if (obsolete)
@@ -2511,7 +2511,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
else
- obsolete = markObsolete0(ver, true);
+ obsolete = markObsolete0(ver, true, null);
}
}
}
@@ -2539,7 +2539,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (!this.ver.equals(ver))
return false;
- marked = markObsolete0(ver, true);
+ marked = markObsolete0(ver, true, null);
}
if (marked)
@@ -2555,9 +2555,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
*
* @param ver Version.
* @param clear {@code True} to clear.
+ * @param extras Predefined extras.
* @return {@code True} if entry is obsolete, {@code false} if entry is still used by other threads or nodes.
*/
- protected final boolean markObsolete0(GridCacheVersion ver, boolean clear) {
+ protected final boolean markObsolete0(GridCacheVersion ver, boolean clear, GridCacheObsoleteEntryExtras extras) {
assert Thread.holdsLock(this);
GridCacheVersion obsoleteVer = obsoleteVersionExtras();
@@ -2572,7 +2573,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (mvcc == null || mvcc.isEmpty(ver)) {
obsoleteVer = ver;
- obsoleteVersionExtras(obsoleteVer);
+ obsoleteVersionExtras(obsoleteVer, extras);
if (clear)
value(null);
@@ -2896,7 +2897,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
synchronized (this) {
if (checkExpired()) {
- rmv = markObsolete0(cctx.versions().next(this.ver), true);
+ rmv = markObsolete0(cctx.versions().next(this.ver), true, null);
return null;
}
@@ -3366,7 +3367,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
else {
- if (markObsolete0(obsoleteVer, true))
+ if (markObsolete0(obsoleteVer, true, null))
obsolete = true; // Success, will return "true".
}
}
@@ -3688,7 +3689,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject prev = saveOldValueUnlocked(false);
- if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+ if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) {
if (swap) {
if (!isStartVersion()) {
try {
@@ -3736,7 +3737,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject prevVal = saveValueForIndexUnlocked();
- if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+ if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) {
if (swap) {
if (!isStartVersion()) {
try {
@@ -3812,7 +3813,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridCacheBatchSwapEntry ret = null;
try {
- if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+ if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) {
if (!isStartVersion() && hasValueUnlocked()) {
if (cctx.offheapTiered() && hasOffHeapPointer()) {
if (cctx.swap().offheapEvictionEnabled())
@@ -3871,7 +3872,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return false;
if (checkExpired()) {
- rmv = markObsolete0(cctx.versions().next(this.ver), true);
+ rmv = markObsolete0(cctx.versions().next(this.ver), true, null);
return false;
}
@@ -3984,9 +3985,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @param obsoleteVer Obsolete version.
*/
- protected void obsoleteVersionExtras(@Nullable GridCacheVersion obsoleteVer) {
- extras = (extras != null) ? extras.obsoleteVersion(obsoleteVer) : obsoleteVer != null ?
- new GridCacheObsoleteEntryExtras(obsoleteVer) : null;
+ protected void obsoleteVersionExtras(@Nullable GridCacheVersion obsoleteVer, GridCacheObsoleteEntryExtras ext) {
+ extras = (extras != null) ?
+ extras.obsoleteVersion(obsoleteVer) :
+ obsoleteVer != null ?
+ (ext != null) ? ext : new GridCacheObsoleteEntryExtras(obsoleteVer) :
+ null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index adc2174..6793f9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,12 +21,15 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.Queue;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,9 +52,11 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -65,8 +70,10 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
@@ -75,6 +82,7 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
@@ -85,6 +93,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT;
@@ -132,6 +141,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** */
private GridFutureAdapter<?> reconnectExchangeFut;
+ /** */
+ private final Queue<Callable<Boolean>> rebalancingQueue = new ConcurrentLinkedDeque8<>();
+
/**
* Partition map futures.
* This set also contains already completed exchange futures to address race conditions when coordinator
@@ -309,6 +321,34 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchWorker.futQ.addFirst(fut);
+ if (!cctx.kernalContext().clientNode()) {
+
+ for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
+ final int idx = cnt;
+
+ cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>() {
+ @Override public void apply(final UUID id, final GridCacheMessage m) {
+ if (!enterBusy())
+ return;
+
+ try {
+ if (m instanceof GridDhtPartitionSupplyMessageV2)
+ cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage(
+ idx, id, (GridDhtPartitionSupplyMessageV2)m);
+ else if (m instanceof GridDhtPartitionDemandMessage)
+ cctx.cacheContext(m.cacheId).preloader().handleDemandMessage(
+ idx, id, (GridDhtPartitionDemandMessage)m);
+ else
+ log.error("Unsupported message type: " + m.getClass().getName());
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+ });
+ }
+ }
+
new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();
if (reconnect) {
@@ -368,6 +408,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
+ /**
+ * @param idx
+ * @return topic
+ */
+ public static Object rebalanceTopic(int idx) {
+ return TOPIC_CACHE.topic("Rebalance", idx);
+ }
+
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
cctx.gridEvents().removeLocalEventListener(discoLsnr);
@@ -392,6 +440,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (AffinityReadyFuture f : readyFuts.values())
f.onDone(stopErr);
+ if (!cctx.kernalContext().clientNode())
+ for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
+ cctx.io().removeOrderedHandler(rebalanceTopic(cnt));
+ }
+
U.cancel(exchWorker);
if (log.isDebugEnabled())
@@ -1103,9 +1156,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean startEvtFired = false;
+ int cnt = 0;
+
+ IgniteInternalFuture asyncStartFut = null;
+
while (!isCancelled()) {
GridDhtPartitionsExchangeFuture exchFut = null;
+ cnt++;
+
try {
boolean preloadFinished = true;
@@ -1220,12 +1279,106 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
if (assignsMap != null) {
+ rebalancingQueue.clear();
+
+ NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
+
for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
int cacheId = e.getKey();
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
- cacheCtx.preloader().addAssignments(e.getValue(), forcePreload);
+ int order = cacheCtx.config().getRebalanceOrder();
+
+ if (orderMap.get(order) == null)
+ orderMap.put(order, new LinkedList<Integer>());
+
+ orderMap.get(order).add(cacheId);
+ }
+
+ Callable<Boolean> marsR = null;
+
+ //Ordered rebalance scheduling.
+ for (Integer order : orderMap.keySet()) {
+ for (Integer cacheId : orderMap.get(order)) {
+ GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+ List<String> waitList = new LinkedList<>();
+
+ for (List<Integer> cIds : orderMap.headMap(order).values()) {
+ for (Integer cId : cIds) {
+ waitList.add(cctx.cacheContext(cId).name());
+ }
+ }
+
+ Callable<Boolean> r = cacheCtx.preloader().addAssignments(
+ assignsMap.get(cacheId), forcePreload, waitList, cnt);
+
+ if (r != null) {
+ U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() +
+ ", waitList=" + waitList.toString() + "]");
+
+ if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME))
+ marsR = r;
+ else
+ rebalancingQueue.add(r);
+ }
+ }
+ }
+
+ if (asyncStartFut != null)
+ asyncStartFut.get(); // Wait for thread stop.
+
+ if (marsR != null || !rebalancingQueue.isEmpty()) {
+ if (futQ.isEmpty()) {
+ U.log(log, "Starting caches rebalancing [top=" + exchFut.topologyVersion() + "]");
+
+ if (marsR != null)
+ try {
+ marsR.call();//Marshaller cache rebalancing launches in sync way.
+ }
+ catch (Exception ex) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send initial demand request to node");
+
+ continue;
+ }
+
+ final GridFutureAdapter fut = new GridFutureAdapter();
+
+ asyncStartFut = fut;
+
+ cctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
+ @Override public Boolean call() {
+ try {
+ while (true) {
+ Callable<Boolean> r = rebalancingQueue.poll();
+
+ if (r == null)
+ return false;
+
+ if (!r.call())
+ return false;
+ }
+ }
+ catch (Exception ex) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send initial demand request to node");
+
+ return false;
+ }
+ finally {
+ fut.onDone();
+ }
+ }
+ }, /*system pool*/ true);
+ }
+ else {
+ U.log(log, "Obsolete exchange, skipping rebalancing [top=" + exchFut.topologyVersion() + "]");
+ }
+ }
+ else {
+ U.log(log, "Nothing scheduled, skipping rebalancing [top=" + exchFut.topologyVersion() + "]");
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 755958e..79861a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -18,9 +18,14 @@
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.lang.IgnitePredicate;
@@ -90,8 +95,11 @@ public interface GridCachePreloader {
*
* @param assignments Assignments to add.
* @param forcePreload Force preload flag.
+ * @param caches Rebalancing of these caches will be finished before this started.
+ * @param cnt Counter.
*/
- public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload);
+ public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
+ Collection<String> caches, int cnt);
/**
* @param p Preload predicate.
@@ -115,6 +123,11 @@ public interface GridCachePreloader {
public IgniteInternalFuture<?> syncFuture();
/**
+ * @return Future which will complete when preloading is finished on current topology.
+ */
+ public IgniteInternalFuture<Boolean> rebalanceFuture();
+
+ /**
* Requests that preloader sends the request for the key.
*
* @param keys Keys to request.
@@ -132,4 +145,30 @@ public interface GridCachePreloader {
* Unwinds undeploys.
*/
public void unwindUndeploys();
-}
\ No newline at end of file
+
+
+ /**
+ * Handles Supply message.
+ *
+ * @param idx Index.
+ * @param id Node Id.
+ * @param s Supply message.
+ */
+ public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s);
+
+ /**
+ * Handles Demand message.
+ *
+ * @param idx Index.
+ * @param id Node Id.
+ * @param d Demand message.
+ */
+ public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d);
+
+ /**
+ * Evicts partition asynchronously.
+ *
+ * @param part Partition.
+ */
+ public void evictPartitionAsync(GridDhtLocalPartition part);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 5405449..b784383 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -18,11 +18,16 @@
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -36,7 +41,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
/** Cache context. */
protected final GridCacheContext<?, ?> cctx;
- /** Logger.*/
+ /** Logger. */
protected final IgniteLogger log;
/** Affinity. */
@@ -113,12 +118,28 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
+ return new GridFinishedFuture<>(true);
+ }
+
+ /** {@inheritDoc} */
@Override public void unwindUndeploys() {
cctx.deploy().unwind(cctx);
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+ @Override public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessageV2 s) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys,
+ AffinityTopologyVersion topVer) {
return new GridFinishedFuture<>();
}
@@ -143,7 +164,13 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
+ @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
+ Collection<String> caches, int cnt) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
// No-op.
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 722e570..c2acd99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -31,9 +31,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
-import java.util.NavigableMap;
import java.util.Set;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -99,7 +97,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManag
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -162,12 +159,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Map of proxies. */
private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies;
- /** Map of preload finish futures grouped by preload order. */
- private final NavigableMap<Integer, IgniteInternalFuture<?>> preloadFuts;
-
- /** Maximum detected rebalance order. */
- private int maxRebalanceOrder;
-
/** Caches stop sequence. */
private final Deque<String> stopSeq;
@@ -209,7 +200,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
caches = new ConcurrentHashMap<>();
jCacheProxies = new ConcurrentHashMap<>();
- preloadFuts = new TreeMap<>();
stopSeq = new LinkedList<>();
}
@@ -392,10 +382,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache [cacheName=" +
U.maskName(cc.getName()) + ']');
- if (cc.getRebalanceMode() != CacheRebalanceMode.NONE) {
- assertParameter(cc.getRebalanceThreadPoolSize() > 0, "rebalanceThreadPoolSize > 0");
+ if (cc.getRebalanceMode() != CacheRebalanceMode.NONE)
assertParameter(cc.getRebalanceBatchSize() > 0, "rebalanceBatchSize > 0");
- }
if (cc.getCacheMode() == PARTITIONED || cc.getCacheMode() == REPLICATED) {
if (cc.getAtomicityMode() == ATOMIC && cc.getWriteSynchronizationMode() == FULL_ASYNC)
@@ -614,8 +602,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
"Deployment mode for cache is not CONTINUOUS or SHARED.");
}
- maxRebalanceOrder = validatePreloadOrder(ctx.config().getCacheConfiguration());
-
ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class,
new CustomEventListener<DynamicCacheChangeBatch>() {
@Override public void onCustomEvent(ClusterNode snd,
@@ -846,31 +832,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
mgr.onKernalStart(false);
- for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
- GridCacheAdapter cache = e.getValue();
-
- if (maxRebalanceOrder > 0) {
- CacheConfiguration cfg = cache.configuration();
-
- int order = cfg.getRebalanceOrder();
-
- if (order > 0 && order != maxRebalanceOrder && cfg.getCacheMode() != LOCAL) {
- GridCompoundFuture fut = (GridCompoundFuture)preloadFuts.get(order);
-
- if (fut == null) {
- fut = new GridCompoundFuture<>();
-
- preloadFuts.put(order, fut);
- }
-
- fut.add(cache.preloader().syncFuture());
- }
- }
- }
-
- for (IgniteInternalFuture<?> fut : preloadFuts.values())
- ((GridCompoundFuture<Object, Object>)fut).markInitialized();
-
for (GridCacheAdapter<?, ?> cache : caches.values())
onKernalStart(cache);
@@ -2779,19 +2740,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * Gets preload finish future for preload-ordered cache with given order. I.e. will get compound preload future
- * with maximum order less than {@code order}.
- *
- * @param order Cache order.
- * @return Compound preload future or {@code null} if order is minimal order found.
- */
- @Nullable public IgniteInternalFuture<?> orderedPreloadFuture(int order) {
- Map.Entry<Integer, IgniteInternalFuture<?>> entry = preloadFuts.lowerEntry(order);
-
- return entry == null ? null : entry.getValue();
- }
-
- /**
* @param spaceName Space name.
* @param keyBytes Key bytes.
* @param valBytes Value bytes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index be2f3d3..b2279ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
+import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
@@ -539,7 +540,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
* @return {@code True} if entry was not being used, passed the filter and could be removed.
* @throws IgniteCheckedException If failed to remove from swap.
*/
- public boolean clearInternal(GridCacheVersion ver, boolean swap) throws IgniteCheckedException {
+ public boolean clearInternal(GridCacheVersion ver, boolean swap, GridCacheObsoleteEntryExtras extras) throws IgniteCheckedException {
boolean rmv = false;
try {
@@ -548,7 +549,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
// Call markObsolete0 to avoid recursive calls to clear if
// we are clearing dht local partition (onMarkedObsolete should not be called).
- if (!markObsolete0(ver, false)) {
+ if (!markObsolete0(ver, false, extras)) {
if (log.isDebugEnabled())
log.debug("Entry could not be marked obsolete (it is still used or has readers): " + this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 4f124e6..b3c13a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -17,6 +17,17 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicStampedReference;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -27,10 +38,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.GridCircularBuffer;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -38,7 +49,6 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -46,18 +56,6 @@ import org.jetbrains.annotations.NotNull;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.LongAdder8;
-import javax.cache.CacheException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicStampedReference;
-import java.util.concurrent.locks.ReentrantLock;
-
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
@@ -286,7 +284,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
}
// Attempt to evict.
- tryEvict(true);
+ cctx.preloader().evictPartitionAsync(this);
}
/**
@@ -411,7 +409,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
// Decrement reservations.
if (state.compareAndSet(s, s, reservations, --reservations)) {
- tryEvict(true);
+ cctx.preloader().evictPartitionAsync(this);
break;
}
@@ -479,7 +477,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
* @param updateSeq Update sequence.
* @return Future for evict attempt.
*/
- IgniteInternalFuture<Boolean> tryEvictAsync(boolean updateSeq) {
+ void tryEvictAsync(boolean updateSeq) {
if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
state.compareAndSet(RENTING, EVICTED, 0, 0)) {
if (log.isDebugEnabled())
@@ -497,15 +495,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
clearDeferredDeletes();
-
- return new GridFinishedFuture<>(true);
}
-
- return cctx.closures().callLocalSafe(new GPC<Boolean>() {
- @Override public Boolean call() {
- return tryEvict(true);
- }
- }, /*system pool*/ true);
+ else {
+ cctx.preloader().evictPartitionAsync(this);
+ }
}
/**
@@ -521,12 +514,11 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
}
/**
- * @param updateSeq Update sequence.
* @return {@code True} if entry has been transitioned to state EVICTED.
*/
- boolean tryEvict(boolean updateSeq) {
+ public void tryEvict() {
if (state.getReference() != RENTING || state.getStamp() != 0 || groupReserved())
- return false;
+ return;
// Attempt to evict partition entries from cache.
clearAll();
@@ -545,14 +537,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
rent.onDone();
- ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
+ ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, true);
clearDeferredDeletes();
-
- return true;
}
-
- return false;
}
/**
@@ -592,7 +580,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
*
*/
void onUnlock() {
- tryEvict(true);
+ cctx.preloader().evictPartitionAsync(this);
}
/**
@@ -632,6 +620,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
it = F.concat(it, unswapIt);
}
+ GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer);
+
try {
while (it.hasNext()) {
GridDhtCacheEntry cached = null;
@@ -639,7 +629,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
try {
cached = it.next();
- if (cached.clearInternal(clearVer, swap)) {
+ if (cached.clearInternal(clearVer, swap, extras)) {
map.remove(cached.key(), cached);
if (!cached.isInternal()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 848ad87..885b0dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -116,6 +116,13 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
}
/**
+ * @param updateSeq Update sequence.
+ */
+ void updateSequence(long updateSeq) {
+ this.updateSeq = updateSeq;
+ }
+
+ /**
* @return Update sequence.
*/
long updateSequence() {
@@ -320,7 +327,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts.size(), "super",
+ return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts != null ? parts.size() : 0, "super",
super.toString());
}
}