You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2017/05/30 17:04:05 UTC
ignite git commit: IGNITE-5232
GridDhtPartitionDemander.requestPartitions invokes sendMessages consequently,
which lead to significant increase of node start time on large clusters with
ssl
Repository: ignite
Updated Branches:
refs/heads/ignite-5232-master [created] 99bc4e93a
IGNITE-5232 GridDhtPartitionDemander.requestPartitions invokes sendMessages consequently, which lead to significant increase of node start time on large clusters with ssl
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/99bc4e93
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/99bc4e93
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/99bc4e93
Branch: refs/heads/ignite-5232-master
Commit: 99bc4e93aeb18563ab4ab09db00e6804a9889898
Parents: 73ae278
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu May 25 16:27:46 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue May 30 20:03:43 2017 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 103 ++++++++++---------
1 file changed, 52 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/99bc4e93/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 838ccc8..cdbae1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -328,41 +328,21 @@ public class GridDhtPartitionDemander {
return new Runnable() {
@Override public void run() {
- try {
- if (next != null)
- fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> f) {
- try {
- if (f.get()) // Not cancelled.
- next.run(); // Starts next cache rebalancing (according to the order).
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug(e.getMessage());
- }
+ if (next != null)
+ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> f) {
+ try {
+ if (f.get()) // Not cancelled.
+ next.run(); // Starts next cache rebalancing (according to the order).
}
- });
-
- requestPartitions(fut, assigns);
- }
- catch (IgniteCheckedException e) {
- ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class);
-
- if (cause != null)
- log.warning("Failed to send initial demand request to node. " + e.getMessage());
- else
- log.error("Failed to send initial demand request to node.", e);
-
- fut.cancel();
- }
- catch (Throwable th) {
- log.error("Runtime error caught during initial demand request sending.", th);
-
- fut.cancel();
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug(e.getMessage());
+ }
+ }
+ });
- if (th instanceof Error)
- throw th;
- }
+ requestPartitions(fut, assigns);
}
};
}
@@ -399,10 +379,7 @@ public class GridDhtPartitionDemander {
* @param assigns Assignments.
* @throws IgniteCheckedException If failed.
*/
- private void requestPartitions(
- RebalanceFuture fut,
- GridDhtPreloaderAssignments assigns
- ) throws IgniteCheckedException {
+ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssignments assigns) {
assert fut != null;
if (topologyChanged(fut)) {
@@ -411,7 +388,7 @@ public class GridDhtPartitionDemander {
return;
}
- synchronized (fut) {
+ synchronized (fut) { // Synchronized to prevent consistency issues in case of parallel cancellation.
if (fut.isDone())
return;
@@ -443,7 +420,7 @@ public class GridDhtPartitionDemander {
int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
- List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+ final List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
for (int cnt = 0; cnt < lsnrCnt; cnt++)
sParts.add(new HashSet<Integer>());
@@ -458,26 +435,50 @@ public class GridDhtPartitionDemander {
for (cnt = 0; cnt < lsnrCnt; cnt++) {
if (!sParts.get(cnt).isEmpty()) {
// Create copy.
- GridDhtPartitionDemandMessage initD = createDemandMessage(d, sParts.get(cnt));
+ final GridDhtPartitionDemandMessage initD = createDemandMessage(d, sParts.get(cnt));
initD.topic(rebalanceTopics.get(cnt));
initD.updateSequence(fut.updateSeq);
initD.timeout(cctx.config().getRebalanceTimeout());
- synchronized (fut) {
- if (fut.isDone())
- return;// Future can be already cancelled at this moment and all failovers happened.
+ final int finalCnt = cnt;
- // New requests will not be covered by failovers.
- cctx.io().sendOrderedMessage(node,
- rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout());
- }
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ try {
+ if (!fut.isDone()) {
+ cctx.io().sendOrderedMessage(node,
+ rebalanceTopics.get(finalCnt), initD, cctx.ioPolicy(), initD.timeout());
+
+ // Cleanup required in case partitions demanded in parallel with cancellation.
+ synchronized (fut) {
+ if (fut.isDone())
+ fut.cleanupRemoteContexts(node.id());
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
+ finalCnt + ", partitions count=" + sParts.get(finalCnt).size() +
+ " (" + partitionsList(sParts.get(finalCnt)) + ")]");
+ }
+ }
+ catch (IgniteCheckedException e) {
+ ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class);
+ if (cause != null)
+ log.warning("Failed to send initial demand request to node. " + e.getMessage());
+ else
+ log.error("Failed to send initial demand request to node.", e);
- if (log.isDebugEnabled())
- log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
- cnt + ", partitions count=" + sParts.get(cnt).size() +
- " (" + partitionsList(sParts.get(cnt)) + ")]");
+ fut.cancel();
+ }
+ catch (Throwable th) {
+ log.error("Runtime error caught during initial demand request sending.", th);
+
+ fut.cancel();
+ }
+ }
+ }, /*system pool*/true);
}
}
}