You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/16 19:46:44 UTC
[2/3] ignite git commit: 1093 correct
1093 correct
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5d23d800
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5d23d800
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5d23d800
Branch: refs/heads/ignite-1093-2
Commit: 5d23d800018f40d00e9b6301f7210b597b893a59
Parents: 93651d8
Author: Anton Vinogradov <av...@apache.org>
Authored: Wed Sep 16 19:47:09 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Wed Sep 16 19:47:09 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionSupplier.java | 123 +++++++++----------
1 file changed, 60 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d23d800/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 1eb1032..7a5641e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -258,92 +258,89 @@ class GridDhtPartitionSupplier {
// Iterator may be null if space does not exist.
if (iter != null) {
- try {
- boolean prepared = false;
+ boolean prepared = false;
- while (iter.hasNext()) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition,
- // so we send '-1' partition and move on.
- s.missed(part);
+ while (iter.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition " +
- "[part=" + part + ", nodeId=" + id + ']');
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
- partMissing = true;
+ partMissing = true;
- break; // For.
- }
+ break; // For.
+ }
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- if (!reply(node, d, s))
- return;
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (!reply(node, d, s))
+ return;
- // Throttle preloading.
- if (preloadThrottle > 0)
- U.sleep(preloadThrottle);
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
- if (++bCnt >= maxBatchesCnt) {
- saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
- swapLsnr = null;
+ swapLsnr = null;
- return;
- }
- else {
- s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
- cctx.cacheId(), d.topologyVersion());
- }
+ return;
}
+ else {
+ s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+ cctx.cacheId(), d.topologyVersion());
+ }
+ }
- Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
+ Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
- GridCacheSwapEntry swapEntry = e.getValue();
+ GridCacheSwapEntry swapEntry = e.getValue();
- GridCacheEntryInfo info = new GridCacheEntryInfo();
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
- info.keyBytes(e.getKey());
- info.ttl(swapEntry.ttl());
- info.expireTime(swapEntry.expireTime());
- info.version(swapEntry.version());
- info.value(swapEntry.value());
+ info.keyBytes(e.getKey());
+ info.ttl(swapEntry.ttl());
+ info.expireTime(swapEntry.expireTime());
+ info.version(swapEntry.version());
+ info.value(swapEntry.value());
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry0(part, info, cctx);
- else {
- if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not send " +
- "cache entry): " + info);
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry0(part, info, cctx);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not send " +
+ "cache entry): " + info);
- continue;
- }
+ continue;
+ }
- // Need to manually prepare cache message.
- if (depEnabled && !prepared) {
- ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
- cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
- swapEntry.valueClassLoaderId() != null ?
- cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
- null;
+ // Need to manually prepare cache message.
+ if (depEnabled && !prepared) {
+ ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+ swapEntry.valueClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+ null;
- if (ldr == null)
- continue;
+ if (ldr == null)
+ continue;
- if (ldr instanceof GridDeploymentInfo) {
- s.prepare((GridDeploymentInfo)ldr);
+ if (ldr instanceof GridDeploymentInfo) {
+ s.prepare((GridDeploymentInfo)ldr);
- prepared = true;
- }
+ prepared = true;
}
}
-
- if (partMissing)
- continue;
- }
- finally {
- iter.close();
}
+
+ iter.close();//todo close at contexts clear
+
+ if (partMissing)
+ continue;
}
}