You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/11/02 14:12:20 UTC
ignite git commit: review
Repository: ignite
Updated Branches:
refs/heads/ignite-1093-3 630c5e186 -> bc0423dfd
review
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc0423df
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc0423df
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc0423df
Branch: refs/heads/ignite-1093-3
Commit: bc0423dfdc6e1e2257f7507537e711ca1b33e253
Parents: 630c5e1
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Mon Nov 2 16:12:11 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon Nov 2 16:12:11 2015 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 4 +--
.../configuration/IgniteConfiguration.java | 12 +++++----
.../GridCachePartitionExchangeManager.java | 13 +++++-----
.../processors/cache/GridCachePreloader.java | 1 -
.../distributed/dht/GridDhtCacheEntry.java | 8 ++++--
.../distributed/dht/GridDhtLocalPartition.java | 7 +++---
.../GridDhtPartitionDemandMessage.java | 5 ++--
.../dht/preloader/GridDhtPartitionDemander.java | 26 +++++++++++++-------
.../dht/preloader/GridDhtPreloader.java | 14 ++++++-----
.../GridCacheRebalancingSyncSelfTest.java | 9 +++++--
10 files changed, 59 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 3a9d632..1e1f437 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1279,23 +1279,23 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
return this;
}
- @Deprecated
/**
* Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
*
* @return Size of rebalancing thread pool.
*/
+ @Deprecated
public int getRebalanceThreadPoolSize() {
return rebalancePoolSize;
}
- @Deprecated
/**
* Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
*
* @param rebalancePoolSize Size of rebalancing thread pool.
* @return {@code this} for chaining.
*/
+ @Deprecated
public CacheConfiguration<K, V> setRebalanceThreadPoolSize(int rebalancePoolSize) {
this.rebalancePoolSize = rebalancePoolSize;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 87fcf3f..02b1066 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -149,8 +149,8 @@ public class IgniteConfiguration {
/** Default keep alive time for public thread pool. */
public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
- /** Default limit of threads used at rebalance. */
- public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 1;// Has minimal affect on the operation of the grid.
+ /** Default limit of threads used for rebalance. */
+ public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 1;
/** Default max queue capacity of public thread pool. */
public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE;
@@ -1342,17 +1342,19 @@ public class IgniteConfiguration {
* Minimum is 1.
* @return count.
*/
- public int getRebalanceThreadPoolSize(){
+ public int getRebalanceThreadPoolSize() {
return Math.max(1, rebalanceThreadPoolSize);
}
/**
* Sets Max count of threads can be used at rebalancing.
- * Minimum is 1.
+ *
+ * Default is {@code 1} which has minimal impact on the operation of the grid.
+ *
* @param size Size.
* @return {@code this} for chaining.
*/
- public IgniteConfiguration setRebalanceThreadPoolSize(int size){
+ public IgniteConfiguration setRebalanceThreadPoolSize(int size) {
this.rebalanceThreadPoolSize = size;
return this;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 382c975..6ca4717 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -322,7 +322,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchWorker.futQ.addFirst(fut);
if (!cctx.kernalContext().clientNode()) {
-
for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
final int idx = cnt;
@@ -409,8 +408,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param idx
- * @return topic
+ * @param idx Index.
+ * @return Topic for index.
*/
public static Object rebalanceTopic(int idx) {
return TOPIC_CACHE.topic("Rebalance", idx);
@@ -440,10 +439,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (AffinityReadyFuture f : readyFuts.values())
f.onDone(stopErr);
- if (!cctx.kernalContext().clientNode())
- for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
+ if (!cctx.kernalContext().clientNode()) {
+ for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++)
cctx.io().removeOrderedHandler(rebalanceTopic(cnt));
- }
+ }
U.cancel(exchWorker);
@@ -1338,7 +1337,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (marsR != null)
try {
- marsR.call();//Marshaller cache rebalancing launches in sync way.
+ marsR.call(); //Marshaller cache rebalancing launches in sync way.
}
catch (Exception ex) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index b2bb8f1..cda392c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -146,7 +146,6 @@ public interface GridCachePreloader {
*/
public void unwindUndeploys();
-
/**
* Handles Supply message.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index f6f49f6..392ad6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -555,7 +555,11 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
* @return {@code True} if entry was not being used, passed the filter and could be removed.
* @throws IgniteCheckedException If failed to remove from swap.
*/
- public boolean clearInternal(GridCacheVersion ver, boolean swap, GridCacheObsoleteEntryExtras extras) throws IgniteCheckedException {
+ public boolean clearInternal(
+ GridCacheVersion ver,
+ boolean swap,
+ GridCacheObsoleteEntryExtras extras
+ ) throws IgniteCheckedException {
boolean rmv = false;
try {
@@ -820,4 +824,4 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
return S.toString(ReaderId.class, this);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index b3c13a8..6a93b04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -475,10 +475,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
/**
* @param updateSeq Update sequence.
- * @return Future for evict attempt.
*/
void tryEvictAsync(boolean updateSeq) {
if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
+ state.getReference() == RENTING && state.getStamp() == 0 &&
state.compareAndSet(RENTING, EVICTED, 0, 0)) {
if (log.isDebugEnabled())
log.debug("Evicted partition: " + this);
@@ -496,9 +496,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
clearDeferredDeletes();
}
- else {
+ else
cctx.preloader().evictPartitionAsync(this);
- }
}
/**
@@ -514,7 +513,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
}
/**
- * @return {@code True} if entry has been transitioned to state EVICTED.
+ *
*/
public void tryEvict() {
if (state.getReference() != RENTING || state.getStamp() != 0 || groupReserved())
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index e99fa9d..53c3d90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -330,7 +330,8 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts != null ? parts.size() : 0, "super",
- super.toString());
+ return S.toString(GridDhtPartitionDemandMessage.class, this,
+ "partCnt", parts != null ? parts.size() : 0,
+ "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 0c41b8b..2c9b422 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -138,6 +138,7 @@ public class GridDhtPartitionDemander {
* Start.
*/
void start() {
+ // No-op.
}
/**
@@ -224,9 +225,11 @@ public class GridDhtPartitionDemander {
*/
private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException {
if (log.isDebugEnabled())
- log.debug("Waiting for " + name + " cache rebalancing [cacheName=" + cctx.name() + ']');
+ log.debug("Waiting for another cache to start rebalancing [cacheName=" + cctx.name() +
+ ", waitCache=" + name + ']');
- RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name).preloader().rebalanceFuture();
+ RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name)
+ .preloader().rebalanceFuture();
if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) {
if (!wFut.get()) {
@@ -326,8 +329,10 @@ public class GridDhtPartitionDemander {
/**
* @param fut Future.
*/
- private boolean requestPartitions(RebalanceFuture fut,
- GridDhtPreloaderAssignments assigns) throws IgniteCheckedException {
+ private boolean requestPartitions(
+ RebalanceFuture fut,
+ GridDhtPreloaderAssignments assigns
+ ) throws IgniteCheckedException {
for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
if (topologyChanged(fut))
return false;
@@ -414,7 +419,7 @@ public class GridDhtPartitionDemander {
* @return String representation of partitions list.
*/
private String partitionsList(Collection<Integer> c) {
- LinkedList<Integer> s = new LinkedList<>(c);
+ List<Integer> s = new ArrayList<>(c);
Collections.sort(s);
@@ -428,6 +433,7 @@ public class GridDhtPartitionDemander {
while (sit.hasNext()) {
int p = sit.next();
+
if (start == -1) {
start = p;
prev = p;
@@ -465,7 +471,8 @@ public class GridDhtPartitionDemander {
public void handleSupplyMessage(
int idx,
final UUID id,
- final GridDhtPartitionSupplyMessageV2 supply) {
+ final GridDhtPartitionSupplyMessageV2 supply
+ ) {
AffinityTopologyVersion topVer = supply.topologyVersion();
final RebalanceFuture fut = rebalanceFut;
@@ -478,9 +485,8 @@ public class GridDhtPartitionDemander {
if (!fut.isActual(supply.updateSequence())) // Current future have another update sequence.
return; // Supple message based on another future.
- if (topologyChanged(fut)) { // Topology already changed (for the future that supply message based on).
+ if (topologyChanged(fut)) // Topology already changed (for the future that supply message based on).
return;
- }
if (log.isDebugEnabled())
log.debug("Received supply message: " + supply);
@@ -525,6 +531,7 @@ public class GridDhtPartitionDemander {
continue;
}
+
if (!preloadEntry(node, p, entry, topVer)) {
if (log.isDebugEnabled())
log.debug("Got entries for invalid partition during " +
@@ -568,9 +575,10 @@ public class GridDhtPartitionDemander {
}
// Only request partitions based on latest topology version.
- for (Integer miss : supply.missed())
+ for (Integer miss : supply.missed()) {
if (cctx.affinity().localNode(miss, topVer))
fut.partitionMissed(id, miss);
+ }
for (Integer miss : supply.missed())
fut.partitionDone(id, miss);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 46a2675..c3472b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -80,7 +80,9 @@ import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
* DHT cache preloader.
*/
public class GridDhtPreloader extends GridCachePreloaderAdapter {
- /** */
+ /**
+ *
+ */
public static final IgniteProductVersion REBALANCING_VER_2_SINCE = IgniteProductVersion.fromString("1.5.0");
/** Default preload resend timeout. */
@@ -115,7 +117,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
new ConcurrentHashMap8<>();
/** */
- private final Queue<GridDhtLocalPartition> partitionsToEvict = new ConcurrentLinkedDeque8<>();
+ private final ConcurrentLinkedDeque8<GridDhtLocalPartition> partitionsToEvict = new ConcurrentLinkedDeque8<>();
/** */
private final AtomicReference<Integer> partitionsEvictionOwning = new AtomicReference<>(0);
@@ -771,13 +773,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
@Override public void evictPartitionAsync(GridDhtLocalPartition part) {
partitionsToEvict.add(part);
- if (partitionsEvictionOwning.compareAndSet(0, 1)) {
+ if (partitionsEvictionOwning.get() == 0 && partitionsEvictionOwning.compareAndSet(0, 1)) {
cctx.closures().callLocalSafe(new GPC<Boolean>() {
@Override public Boolean call() {
boolean firstRun = true;
while (true) {
- if (!firstRun && !partitionsEvictionOwning.compareAndSet(0, 1))
+ if (!firstRun && !partitionsToEvict.isEmptyx() &&
+ !partitionsEvictionOwning.compareAndSet(0, 1))
return false;
firstRun = false;
@@ -785,9 +788,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
try {
GridDhtLocalPartition part = partitionsToEvict.poll();
- if (part == null) {
+ if (part == null)
return false;
- }
part.tryEvict();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index b17588f..c866a1d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -232,6 +232,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
}
}
};
+
Thread t2 = new Thread() {
@Override public void run() {
while (!concurrentStartFinished) {
@@ -259,7 +260,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
long spend = (System.currentTimeMillis() - start) / 1000;
- log.info("Spend " + spend + " seconds to rebalance entries.");
+ info("Time to rebalance entries: " + spend);
}
/**
@@ -308,6 +309,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
}
}
+ /**
+ *
+ */
protected void checkSupplyContextMapIsEmpty() {
for (Ignite g : G.allGrids()) {
for (GridCacheAdapter c : ((IgniteEx)g).context().cache().internalCaches()) {
@@ -464,9 +468,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
log.info("Spend " + spend + " seconds to rebalance entries.");
}
+ /** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
}
-}
\ No newline at end of file
+}