You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2019/05/17 12:24:08 UTC
[ignite] branch master updated: Code commenting and reformatting
done previously in researching of IGNITE-5078
This is an automated email from the ASF dual-hosted git repository.
dpavlov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d486f24 Code commenting and reformatting done previously in researching of IGNITE-5078
d486f24 is described below
commit d486f24b1be3f08e8b1e5e9992c4c8718e169299
Author: Dmitriy Pavlov <dp...@apache.org>
AuthorDate: Fri May 17 15:21:31 2019 +0300
Code commenting and reformatting done previously in researching of IGNITE-5078
---
.../ignite/internal/GridKernalGatewayImpl.java | 2 +-
.../cache/GridCachePartitionExchangeManager.java | 4 +-
.../dht/preloader/CacheGroupAffinityMessage.java | 3 +-
.../CachePartitionPartialCountersMap.java | 4 +-
.../dht/preloader/GridDhtForceKeysFuture.java | 6 +-
.../dht/preloader/GridDhtForceKeysRequest.java | 7 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 6 +-
.../GridDhtPartitionDemandLegacyMessage.java | 5 +-
.../preloader/GridDhtPartitionDemandMessage.java | 3 +-
.../dht/preloader/GridDhtPartitionDemander.java | 10 +--
.../dht/preloader/GridDhtPartitionFullMap.java | 4 +-
.../dht/preloader/GridDhtPartitionMap.java | 2 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 9 ++-
.../preloader/GridDhtPartitionSupplyMessage.java | 3 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 88 +++++++++++-----------
.../preloader/GridDhtPartitionsFullMessage.java | 15 ++--
.../preloader/GridDhtPartitionsSingleMessage.java | 15 ++--
.../preloader/GridDhtPartitionsSingleRequest.java | 3 +-
.../dht/preloader/GridDhtPreloader.java | 18 +++--
.../preloader/IgniteDhtDemandedPartitionsMap.java | 5 +-
.../dht/preloader/IgniteRebalanceIteratorImpl.java | 1 -
.../dht/preloader/InitNewCoordinatorFuture.java | 8 +-
.../StopCachesOnClientReconnectExchangeTask.java | 2 +-
.../dht/preloader/latch/ExchangeLatchManager.java | 59 +++++++--------
.../dht/topology/GridClientPartitionTopology.java | 2 +-
.../dht/topology/GridDhtPartitionTopology.java | 2 +-
.../dht/topology/GridDhtPartitionTopologyImpl.java | 4 +-
.../distributed/near/GridNearCacheAdapter.java | 2 +-
28 files changed, 146 insertions(+), 146 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index 75da88e..4738892 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -40,7 +40,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
/** */
private static final long serialVersionUID = 0L;
- /** */
+ /** Lock to prevent activities from running kernal related call while it's stopping. */
@GridToStringExclude
private final ReadWriteLock rwLock =
new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 199f725..50a282d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -200,7 +200,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
@GridToStringExclude
private final ConcurrentMap<Integer, GridClientPartitionTopology> clientTops = new ConcurrentHashMap<>();
- /** */
+ /** Last initialized topology future. */
@Nullable private volatile GridDhtPartitionsExchangeFuture lastInitializedFut;
/** */
@@ -1229,7 +1229,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
public void refreshPartitions() { refreshPartitions(cctx.cache().cacheGroups()); }
/**
- * @param nodes Nodes.
+ * @param nodes Target Nodes.
* @param msgTopVer Topology version. Will be added to full message.
* @param grps Selected cache groups.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index 7115ac1..0e66d75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -182,7 +182,8 @@ public class CacheGroupAffinityMessage implements Message {
* @param discoCache Discovery data cache.
* @return Nodes list.
*/
- public static List<ClusterNode> toNodes(GridLongList assign, Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) {
+ public static List<ClusterNode> toNodes(GridLongList assign, Map<Long, ClusterNode> nodesByOrder,
+ DiscoCache discoCache) {
List<ClusterNode> assign0 = new ArrayList<>(assign.size());
for (int n = 0; n < assign.size(); n++) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
index 986a100..04a0cdc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
@@ -131,8 +131,7 @@ public class CachePartitionPartialCountersMap implements Serializable {
}
/**
- * Cuts the array sizes according to curIdx. No more entries can be added to this map
- * after this method is called.
+ * Cuts the array sizes according to curIdx. No more entries can be added to this map after this method is called.
*/
public void trim() {
if (partIds != null && curIdx < partIds.length) {
@@ -188,7 +187,6 @@ public class CachePartitionPartialCountersMap implements Serializable {
return updCntrs[idx];
}
-
/**
* @param cntrsMap Partial local counters map.
* @return Partition ID to partition counters map.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index fba1d9d..7d16884 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -302,8 +302,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
*/
private Map<ClusterNode, Set<KeyCacheObject>> map(KeyCacheObject key,
@Nullable Map<ClusterNode, Set<KeyCacheObject>> mappings,
- Collection<ClusterNode> exc)
- {
+ Collection<ClusterNode> exc) {
ClusterNode loc = cctx.localNode();
GridCacheEntryEx e = cctx.dht().peekEx(key);
@@ -407,8 +406,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
}
/**
- * Mini-future for get operations. Mini-futures are only waiting on a single
- * node as opposed to multiple nodes.
+ * Mini-future for get operations. Mini-futures are only waiting on a single node as opposed to multiple nodes.
*/
private class MiniFuture extends GridFutureAdapter<Object> {
/** Mini-future ID. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
index 80c45ef..4ee74ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
@@ -37,8 +37,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
- * Force keys request. This message is sent by node while preloading to force
- * another node to put given keys into the next batch of transmitting entries.
+ * Force keys request. This message is sent by node while preloading to force another node to put given keys into the
+ * next batch of transmitting entries.
*/
public class GridDhtForceKeysRequest extends GridCacheIdMessage implements GridCacheDeployable {
/** */
@@ -121,8 +121,7 @@ public class GridDhtForceKeysRequest extends GridCacheIdMessage implements GridC
return topVer;
}
- /** {@inheritDoc}
- * @param ctx*/
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index ab85df3..ade0004 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -95,9 +95,10 @@ public class GridDhtForceKeysResponse extends GridCacheIdMessage implements Grid
/**
* Sets error.
+ *
* @param err Error.
*/
- public void error(IgniteCheckedException err){
+ public void error(IgniteCheckedException err) {
this.err = err;
}
@@ -156,8 +157,7 @@ public class GridDhtForceKeysResponse extends GridCacheIdMessage implements Grid
infos.add(info);
}
- /** {@inheritDoc}
- * @param ctx*/
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandLegacyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandLegacyMessage.java
index cd7741b55..60d145d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandLegacyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandLegacyMessage.java
@@ -114,7 +114,7 @@ public class GridDhtPartitionDemandLegacyMessage extends GridCacheGroupIdMessage
workerId = cp.workerId();
topVer = cp.topologyVersion();
- if(!cp.partitions().isEmpty()) {
+ if (!cp.partitions().isEmpty()) {
parts = new HashSet<>(cp.partitions().size());
parts.addAll(cp.partitions().fullSet());
@@ -248,8 +248,7 @@ public class GridDhtPartitionDemandLegacyMessage extends GridCacheGroupIdMessage
return topVer;
}
- /** {@inheritDoc}
- * @param ctx*/
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index bae3264..fd1862c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -218,8 +218,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage {
return this;
}
- /** {@inheritDoc}
- * @param ctx*/
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 93602db..5c617a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -456,7 +456,7 @@ public class GridDhtPartitionDemander {
GridDhtPartitionDemandMessage d = e.getValue();
- int rmtStripes = Optional.ofNullable((Integer) node.attribute(IgniteNodeAttributes.ATTR_REBALANCE_POOL_SIZE))
+ int rmtStripes = Optional.ofNullable((Integer)node.attribute(IgniteNodeAttributes.ATTR_REBALANCE_POOL_SIZE))
.orElse(1);
int rmtTotalStripes = rmtStripes <= locStripes ? rmtStripes : locStripes;
@@ -1395,10 +1395,10 @@ public class GridDhtPartitionDemander {
int remainingRoutines = remaining.size() - 1;
U.log(log, "Completed " + ((remainingRoutines == 0 ? "(final) " : "") +
- "rebalancing [grp=" + grp.cacheOrGroupName() +
- ", supplier=" + nodeId +
- ", topVer=" + topologyVersion() +
- ", progress=" + (routines - remainingRoutines) + "/" + routines + "]"));
+ "rebalancing [grp=" + grp.cacheOrGroupName() +
+ ", supplier=" + nodeId +
+ ", topVer=" + topologyVersion() +
+ ", progress=" + (routines - remainingRoutines) + "/" + routines + "]"));
remaining.remove(nodeId);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
index 4179520..120d5fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
@@ -30,7 +30,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
/**
- * Full partition map.
+ * Full partition map from all nodes.
*/
public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap> implements Comparable<GridDhtPartitionFullMap>, Externalizable {
/** */
@@ -225,7 +225,7 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap>
buf.append('{');
- while(true) {
+ while (true) {
Map.Entry<UUID, GridDhtPartitionMap> e = it.next();
UUID nodeId = e.getKey();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
index e84869d..5cbf610 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
/**
- * Partition map.
+ * Partition map from single node.
*/
public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Externalizable {
/** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 7f52856..a9a2027 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -266,10 +266,10 @@ class GridDhtPartitionSupplier {
maxBatchesCnt = 1;
GridDhtPartitionSupplyMessage supplyMsg = new GridDhtPartitionSupplyMessage(
- demandMsg.rebalanceId(),
- grp.groupId(),
- demandMsg.topologyVersion(),
- grp.deploymentEnabled()
+ demandMsg.rebalanceId(),
+ grp.groupId(),
+ demandMsg.topologyVersion(),
+ grp.deploymentEnabled()
);
Set<Integer> remainingParts;
@@ -501,6 +501,7 @@ class GridDhtPartitionSupplier {
/**
* Extracts entry info from row.
+ *
* @param row Cache data row.
* @return Entry info.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 7e281e5..5ac7093 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -219,7 +219,8 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple
* @param cacheObjCtx Cache object context.
* @throws IgniteCheckedException If failed.
*/
- void addEntry0(int p, boolean historical, GridCacheEntryInfo info, GridCacheSharedContext ctx, CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
+ void addEntry0(int p, boolean historical, GridCacheEntryInfo info, GridCacheSharedContext ctx,
+ CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
assert info != null;
assert info.key() != null : info;
assert info.value() != null || historical : info;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0ec1b7c..78e46fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -155,9 +155,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private static final IgniteProductVersion FORCE_AFF_REASSIGNMENT_SINCE = IgniteProductVersion.fromString("2.4.3");
/**
- * This may be useful when per-entry (not per-cache based) partition policy is in use.
- * See {@link IgniteSystemProperties#IGNITE_SKIP_PARTITION_SIZE_VALIDATION} for details.
- * Default value is {@code false}.
+ * This may be useful when per-entry (not per-cache based) partition policy is in use. See {@link
+ * IgniteSystemProperties#IGNITE_SKIP_PARTITION_SIZE_VALIDATION} for details. Default value is {@code false}.
*/
private static final boolean SKIP_PARTITION_SIZE_VALIDATION = Boolean.getBoolean(IgniteSystemProperties.IGNITE_SKIP_PARTITION_SIZE_VALIDATION);
@@ -206,9 +205,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private AtomicBoolean added = new AtomicBoolean(false);
/**
- * Discovery event receive latch. There is a race between discovery event processing and single message
- * processing, so it is possible to create an exchange future before the actual discovery event is received.
- * This latch is notified when the discovery event arrives.
+ * Discovery event receive latch. There is a race between discovery event processing and single message processing,
+ * so it is possible to create an exchange future before the actual discovery event is received. This latch is
+ * notified when the discovery event arrives.
*/
@GridToStringExclude
private final CountDownLatch evtLatch = new CountDownLatch(1);
@@ -227,15 +226,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
/**
- * Message received from node joining cluster (if this is 'node join' exchange),
- * needed if this exchange is merged with another one.
+ * Message received from node joining cluster (if this is 'node join' exchange), needed if this exchange is merged
+ * with another one.
*/
@GridToStringExclude
private GridDhtPartitionsSingleMessage pendingJoinMsg;
/**
- * Messages received on non-coordinator are stored in case if this node
- * becomes coordinator.
+ * Messages received on non-coordinator are stored in case if this node becomes coordinator.
*/
private final Map<UUID, GridDhtPartitionsSingleMessage> pendingSingleMsgs = new ConcurrentHashMap<>();
@@ -626,7 +624,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
|| !firstDiscoEvt0.eventNode().isClient()
|| firstDiscoEvt0.eventNode().isLocal()
|| ((firstDiscoEvt.type() == EVT_NODE_JOINED) &&
- cctx.cache().hasCachesReceivedFromJoin(firstDiscoEvt.eventNode()));
+ cctx.cache().hasCachesReceivedFromJoin(firstDiscoEvt.eventNode()));
}
/**
@@ -638,7 +636,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @return First event discovery event.
- *
*/
public DiscoveryEvent firstEvent() {
return firstDiscoEvt;
@@ -1301,8 +1298,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @param crd Coordinator flag.
- * @throws IgniteCheckedException If failed.
* @return Exchange type.
+ * @throws IgniteCheckedException If failed.
*/
private ExchangeType onAffinityChangeRequest(boolean crd) throws IgniteCheckedException {
assert affChangeMsg != null : this;
@@ -1317,8 +1314,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @param crd Coordinator flag.
- * @throws IgniteCheckedException If failed.
* @return Exchange type.
+ * @throws IgniteCheckedException If failed.
*/
private ExchangeType onClientNodeEvent(boolean crd) throws IgniteCheckedException {
assert firstDiscoEvt.eventNode().isClient() : this;
@@ -1338,8 +1335,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @param crd Coordinator flag.
- * @throws IgniteCheckedException If failed.
* @return Exchange type.
+ * @throws IgniteCheckedException If failed.
*/
private ExchangeType onServerNodeEvent(boolean crd) throws IgniteCheckedException {
assert !firstDiscoEvt.eventNode().isClient() : this;
@@ -1676,7 +1673,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
try {
// This avoids unnessesary waiting for rollback.
partReleaseFut.get(curTimeout > 0 && !txRolledBack ?
- Math.min(curTimeout, waitTimeout) : waitTimeout, TimeUnit.MILLISECONDS);
+ Math.min(curTimeout, waitTimeout) : waitTimeout, TimeUnit.MILLISECONDS);
break;
}
@@ -1695,7 +1692,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
catch (IgniteCheckedException e) {
- U.warn(log,"Unable to await partitions release future", e);
+ U.warn(log, "Unable to await partitions release future", e);
throw e;
}
@@ -1829,11 +1826,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private void dumpPendingObjects(IgniteInternalFuture<?> partReleaseFut, boolean txTimeoutNotifyFlag) {
U.warn(cctx.kernalContext().cluster().diagnosticLog(),
"Failed to wait for partition release future [topVer=" + initialVersion() +
- ", node=" + cctx.localNodeId() + "]");
+ ", node=" + cctx.localNodeId() + "]");
if (txTimeoutNotifyFlag)
U.warn(cctx.kernalContext().cluster().diagnosticLog(), "Consider changing TransactionConfiguration." +
- "txTimeoutOnPartitionMapExchange to non default value to avoid this message.");
+ "txTimeoutOnPartitionMapExchange to non default value to avoid this message.");
U.warn(log, "Partition release future: " + partReleaseFut);
@@ -1951,7 +1948,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @param fullMsg Message to send.
- * @param nodes Nodes.
+ * @param nodes Target Nodes.
* @param mergedJoinExchMsgs Messages received from merged 'join node' exchanges.
* @param affinityForJoinedNodes Affinity if was requested by some nodes.
*/
@@ -2005,8 +2002,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
// If exchange has merged, use merged version of exchange id.
GridDhtPartitionExchangeId sndExchId = mergedJoinExchMsgs != null
? Optional.ofNullable(mergedJoinExchMsgs.get(node.id()))
- .map(GridDhtPartitionsAbstractMessage::exchangeId)
- .orElse(exchangeId())
+ .map(GridDhtPartitionsAbstractMessage::exchangeId)
+ .orElse(exchangeId())
: exchangeId();
if (sndExchId != null && !sndExchId.equals(exchangeId())) {
@@ -2274,7 +2271,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (log.isInfoEnabled()) {
log.info("Completed partition exchange [localNode=" + cctx.localNodeId() +
- ", exchange=" + (log.isDebugEnabled() ? this : shortInfo()) + ", topVer=" + topologyVersion() + "]");
+ ", exchange=" + (log.isDebugEnabled() ? this : shortInfo()) + ", topVer=" + topologyVersion() + "]");
if (err == null) {
timeBag.finishGlobalStage("Exchange done");
@@ -2335,7 +2332,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
*
* @param declared Single messages that were expected to be received during exchange.
* @param merged Single messages from nodes that were merged during exchange.
- *
* @return Pair with discovery lag and node id which started exchange later than others.
*/
private T2<Long, UUID> calculateDiscoveryLag(
@@ -2391,7 +2387,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
final int timeout = Math.max(1000,
(int)(cctx.kernalContext().config().getFailureDetectionTimeout() / 2));
- for (;;) {
+ for (; ; ) {
cctx.exchange().exchangerBlockingSectionBegin();
try {
@@ -2561,7 +2557,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
synchronized (mux) {
assert (!isDone() && !initFut.isDone()) || cctx.kernalContext().isStopping() : this;
- assert (mergedWith == null && state == null) || cctx.kernalContext().isStopping() : this;
+ assert (mergedWith == null && state == null) || cctx.kernalContext().isStopping() : this;
state = ExchangeLocalState.MERGED;
@@ -2651,8 +2647,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * Method is called on coordinator in situation when initial ExchangeFuture created on client join event was preempted
- * from exchange history because of IGNITE_EXCHANGE_HISTORY_SIZE property.
+ * Method is called on coordinator in situation when initial ExchangeFuture created on client join event was
+ * preempted from exchange history because of IGNITE_EXCHANGE_HISTORY_SIZE property.
*
* @param node Client node that should try to reconnect to the cluster.
* @param msg Single message received from the client which didn't find original ExchangeFuture.
@@ -2755,8 +2751,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * Tries to fast reply with {@link GridDhtPartitionsFullMessage} on received single message
- * in case of exchange future has already completed.
+ * Tries to fast reply with {@link GridDhtPartitionsFullMessage} on received single message in case of exchange
+ * future has already completed.
*
* @param node Cluster node which sent single message.
* @param msg Single message.
@@ -3165,7 +3161,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * Detect lost partitions.
+ * Detect lost partitions in case of node left or failed. For topology coordinator is called when all {@link
+ * GridDhtPartitionsSingleMessage} were received. For other nodes is called when exchange future is completed by
+ * {@link GridDhtPartitionsFullMessage}.
*
* @param resTopVer Result topology version.
*/
@@ -3238,8 +3236,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * Creates an IgniteCheckedException that is used as root cause of the exchange initialization failure.
- * This method aggregates all the exceptions provided from all participating nodes.
+ * Creates an IgniteCheckedException that is used as root cause of the exchange initialization failure. This method
+ * aggregates all the exceptions provided from all participating nodes.
*
* @param globalExceptions collection exceptions from all participating nodes.
* @return exception that represents a cause of the exchange initialization failure.
@@ -3266,8 +3264,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * Sends {@link DynamicCacheChangeFailureMessage} to all participated nodes
- * that represents a cause of exchange failure.
+ * Sends {@link DynamicCacheChangeFailureMessage} to all participated nodes that represents a cause of exchange
+ * failure.
*/
private void sendExchangeFailureMessage() {
assert crd != null && crd.isLocal();
@@ -3292,7 +3290,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return;
}
- catch (IgniteCheckedException e) {
+ catch (IgniteCheckedException e) {
if (reconnectOnError(e))
onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
else
@@ -3301,6 +3299,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * Called only for coordinator node when all {@link GridDhtPartitionsSingleMessage}s were received
+ *
* @param sndResNodes Additional nodes to send finish message to.
*/
private void onAllReceived(@Nullable Collection<ClusterNode> sndResNodes) {
@@ -3396,7 +3396,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
else
cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, true);
-
doInParallel(
parallelismLvl,
cctx.kernalContext().getSystemExecutorService(),
@@ -3441,7 +3440,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (activateCluster() || changedBaseline())
assignPartitionsStates();
- DiscoveryCustomMessage discoveryCustomMessage = ((DiscoveryCustomEvent) firstDiscoEvt).customMessage();
+ DiscoveryCustomMessage discoveryCustomMessage = ((DiscoveryCustomEvent)firstDiscoEvt).customMessage();
if (discoveryCustomMessage instanceof DynamicCacheChangeBatch) {
if (exchActions != null) {
@@ -3454,7 +3453,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
else if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage
- && ((SnapshotDiscoveryMessage)discoveryCustomMessage).needAssignPartitions())
+ && ((SnapshotDiscoveryMessage)discoveryCustomMessage).needAssignPartitions())
assignPartitionsStates();
}
else {
@@ -3813,7 +3812,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
-
if (affReq != null) {
Map<Integer, CacheGroupAffinityMessage> cachesAff = U.newHashMap(affReq.size());
@@ -4129,7 +4127,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return; // Node is stopping, no need to further process exchange.
}
- assert resTopVer.equals(exchCtx.events().topologyVersion()) : "Unexpected result version [" +
+ assert resTopVer.equals(exchCtx.events().topologyVersion()) : "Unexpected result version [" +
"msgVer=" + resTopVer +
", locVer=" + exchCtx.events().topologyVersion() + ']';
}
@@ -4290,7 +4288,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return;
try {
- assert msg.error() != null: msg;
+ assert msg.error() != null : msg;
// Try to revert all the changes that were done during initialization phase
cctx.affinity().forceCloseCaches(
@@ -4570,7 +4568,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return;
try {
- onBecomeCoordinator((InitNewCoordinatorFuture) fut);
+ onBecomeCoordinator((InitNewCoordinatorFuture)fut);
}
finally {
lock.unlock();
@@ -4918,7 +4916,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
newCrdFut = this.newCrdFut;
}
- if(newCrdFut != null)
+ if (newCrdFut != null)
newCrdFut.addDiagnosticRequest(diagCtx);
if (crd != null) {
@@ -4926,7 +4924,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
diagCtx.exchangeInfo(crd.id(), initialVersion(), "Exchange future waiting for coordinator " +
"response [crd=" + crd.id() + ", topVer=" + initialVersion() + ']');
}
- else if (!remaining.isEmpty()){
+ else if (!remaining.isEmpty()) {
UUID nodeId = remaining.iterator().next();
diagCtx.exchangeInfo(nodeId, initialVersion(), "Exchange future on coordinator waiting for " +
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 1640e8e..a2d0697 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -51,7 +52,9 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
- * Information about partitions of all nodes in topology.
+ * Information about partitions of all nodes in topology. <br> Is sent by topology coordinator: when all {@link
+ * GridDhtPartitionsSingleMessage}s were received. <br> May be also compacted as part of {@link
+ * CacheAffinityChangeMessage} for node left or failed case.<br>
*/
public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessage {
/** */
@@ -440,12 +443,12 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@Override public byte[] apply(Object payload) throws IgniteCheckedException {
byte[] marshalled = U.marshal(ctx, payload);
- if(compressed())
+ if (compressed())
marshalled = U.zip(marshalled, ctx.gridConfig().getNetworkCompressionLevel());
return marshalled;
}
- });
+ });
Iterator<byte[]> iterator = marshalled.iterator();
@@ -582,10 +585,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partCntrs2 == null)
partCntrs2 = new IgniteDhtPartitionCountersMap2();
- if(partHistSuppliers == null)
+ if (partHistSuppliers == null)
partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
- if(partsToReload == null)
+ if (partsToReload == null)
partsToReload = new IgniteDhtPartitionsToReloadMap();
if (errs == null)
@@ -820,7 +823,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
public void merge(GridDhtPartitionsFullMessage other, GridDiscoveryManager discovery) {
assert other.exchangeId() == null && exchangeId() == null :
"Both current and merge full message must have exchangeId == null"
- + other.exchangeId() + "," + exchangeId();
+ + other.exchangeId() + "," + exchangeId();
for (Map.Entry<Integer, GridDhtPartitionFullMap> groupAndMap : other.partitions().entrySet()) {
int grpId = groupAndMap.getKey();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index d0d98c7..77f8d5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -41,13 +41,15 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
- * Information about partitions of a single node.
+ * Information about partitions of a single node. <br>
+ *
+ * Sent in response to {@link GridDhtPartitionsSingleRequest} and during processing partitions exchange future.
*/
public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMessage {
/** */
private static final long serialVersionUID = 0L;
- /** Local partitions. */
+ /** Local partitions. Serialized as {@link #partsBytes}, may be compressed. */
@GridToStringInclude
@GridDirectTransient
private Map<Integer, GridDhtPartitionMap> parts;
@@ -56,7 +58,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
@GridDirectMap(keyType = Integer.class, valueType = Integer.class)
private Map<Integer, Integer> dupPartsData;
- /** Serialized partitions. */
+ /** Serialized local partitions. Unmarshalled to {@link #parts}. */
private byte[] partsBytes;
/** Partitions update counters. */
@@ -102,8 +104,8 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
private long exchangeStartTime;
/**
- * Exchange finish message, sent to new coordinator when it tries to
- * restore state after previous coordinator failed during exchange.
+ * Exchange finish message, sent to new coordinator when it tries to restore state after previous coordinator failed
+ * during exchange.
*/
private GridDhtPartitionsFullMessage finishMsg;
@@ -330,8 +332,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
this.exchangeStartTime = exchangeStartTime;
}
- /** {@inheritDoc}
- * @param ctx*/
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index 26d3cde..928c47c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -52,7 +52,8 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
* @param restoreExchId Initial exchange ID for current exchange.
* @return Message.
*/
- static GridDhtPartitionsSingleRequest restoreStateRequest(GridDhtPartitionExchangeId msgExchId, GridDhtPartitionExchangeId restoreExchId) {
+ static GridDhtPartitionsSingleRequest restoreStateRequest(GridDhtPartitionExchangeId msgExchId,
+ GridDhtPartitionExchangeId restoreExchId) {
GridDhtPartitionsSingleRequest msg = new GridDhtPartitionsSingleRequest(msgExchId);
msg.restoreState(true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index ff420a7..8d7ce59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -38,9 +38,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -203,7 +203,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut) {
+ @Override public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId,
+ GridDhtPartitionsExchangeFuture exchFut) {
assert exchFut == null || exchFut.isDone();
// No assignments for disabled preloader.
@@ -218,8 +219,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
assert exchFut == null || exchFut.context().events().topologyVersion().equals(top.readyTopologyVersion()) :
"Topology version mismatch [exchId=" + exchId +
- ", grp=" + grp.name() +
- ", topVer=" + top.readyTopologyVersion() + ']';
+ ", grp=" + grp.name() +
+ ", topVer=" + top.readyTopologyVersion() + ']';
GridDhtPreloaderAssignments assignments = new GridDhtPreloaderAssignments(exchId, topVer);
@@ -521,7 +522,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
* @return Future for request.
*/
@SuppressWarnings({"unchecked", "RedundantCast"})
- private GridDhtFuture<Object> request0(GridCacheContext cctx, Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+ private GridDhtFuture<Object> request0(GridCacheContext cctx, Collection<KeyCacheObject> keys,
+ AffinityTopologyVersion topVer) {
if (cctx.isNear())
cctx = cctx.near().dht().context();
@@ -587,7 +589,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
paused = true;
}
finally {
- demandLock.writeLock().unlock();
+ demandLock.writeLock().unlock();
}
}
@@ -597,14 +599,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
try {
final List<GridTuple3<Integer, UUID, GridDhtPartitionSupplyMessage>> msgToProc =
- new ArrayList<>(pausedDemanderQueue);
+ new ArrayList<>(pausedDemanderQueue);
pausedDemanderQueue.clear();
final GridDhtPreloader preloader = this;
ctx.kernalContext().closure().runLocalSafe(() -> msgToProc.forEach(
- m -> preloader.handleSupplyMessage(m.get1(), m.get2(), m.get3())
+ m -> preloader.handleSupplyMessage(m.get1(), m.get2(), m.get3())
), GridIoPolicy.SYSTEM_POOL);
paused = false;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
index 9fe3c64..4db8795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
@@ -40,8 +40,7 @@ public class IgniteDhtDemandedPartitionsMap implements Serializable {
public IgniteDhtDemandedPartitionsMap(
@Nullable CachePartitionPartialCountersMap historical,
- @Nullable Set<Integer> full)
- {
+ @Nullable Set<Integer> full) {
this.historical = historical;
this.full = full;
}
@@ -68,6 +67,7 @@ public class IgniteDhtDemandedPartitionsMap implements Serializable {
/**
* Adds partition for preloading from all current data.
+ *
* @param partId Partition ID.
*/
public void addFull(int partId) {
@@ -81,6 +81,7 @@ public class IgniteDhtDemandedPartitionsMap implements Serializable {
/**
* Removes partition.
+ *
* @param partId Partition ID.
* @return {@code True} if changed.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteRebalanceIteratorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteRebalanceIteratorImpl.java
index 75ae89a..eb39c99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteRebalanceIteratorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteRebalanceIteratorImpl.java
@@ -60,7 +60,6 @@ public class IgniteRebalanceIteratorImpl implements IgniteRebalanceIterator {
private boolean closed;
/**
- *
* @param fullIterators
* @param historicalIterator
* @throws IgniteCheckedException
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index 06e33ab..f311817 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -44,7 +44,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.*;
+import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.exchangeProtocolVersion;
/**
*
@@ -244,7 +244,7 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture implements Igni
if (fullMsg0 != null && fullMsg0.resultTopologyVersion() != null) {
assert fullMsg == null || fullMsg.resultTopologyVersion().equals(fullMsg0.resultTopologyVersion());
- fullMsg = fullMsg0;
+ fullMsg = fullMsg0;
}
else
msgs.put(node, msg);
@@ -268,7 +268,7 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture implements Igni
AffinityTopologyVersion resVer = fullMsg.resultTopologyVersion();
for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>> it = msgs.entrySet().iterator();
- it.hasNext();) {
+ it.hasNext(); ) {
Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e = it.next();
GridDhtPartitionExchangeId msgVer = joinedNodes.get(e.getKey().id());
@@ -298,7 +298,7 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture implements Igni
}
}
else {
- for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>> it = msgs.entrySet().iterator(); it.hasNext();) {
+ for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>> it = msgs.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e = it.next();
GridDhtPartitionExchangeId msgVer = joinedNodes.get(e.getKey().id());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/StopCachesOnClientReconnectExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/StopCachesOnClientReconnectExchangeTask.java
index e89bbd1..1492657 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/StopCachesOnClientReconnectExchangeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/StopCachesOnClientReconnectExchangeTask.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
/**
*
*/
-public class StopCachesOnClientReconnectExchangeTask extends GridFutureAdapter<Void>
+public class StopCachesOnClientReconnectExchangeTask extends GridFutureAdapter<Void>
implements CachePartitionExchangeWorkerTask {
/** */
@GridToStringInclude
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
index 722c4a3..263f150 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -61,8 +61,8 @@ public class ExchangeLatchManager {
private static final IgniteProductVersion VERSION_SINCE = IgniteProductVersion.fromString("2.5.0");
/**
- * Exchange latch V2 protocol introduces following optimization:
- * Joining nodes are explicitly excluded from possible latch participants.
+ * Exchange latch V2 protocol introduces following optimization: Joining nodes are explicitly excluded from possible
+ * latch participants.
*/
public static final IgniteProductVersion PROTOCOL_V2_VERSION_SINCE = IgniteProductVersion.fromString("2.5.3");
@@ -95,7 +95,10 @@ public class ExchangeLatchManager {
@GridToStringInclude
private final ConcurrentMap<CompletableLatchUid, ClientLatch> clientLatches = new ConcurrentHashMap<>();
- /** Map (topology version -> joined node on this version). This map is needed to exclude joined nodes from latch participants. */
+ /**
+ * Map (topology version -> joined node on this version). This map is needed to exclude joined nodes from latch
+ * participants.
+ */
@GridToStringExclude
private final ConcurrentMap<AffinityTopologyVersion, ClusterNode> joinedNodes = new ConcurrentHashMap<>();
@@ -116,7 +119,7 @@ public class ExchangeLatchManager {
if (!ctx.clientNode() && !ctx.isDaemon()) {
ctx.io().addMessageListener(GridTopic.TOPIC_EXCHANGE, (nodeId, msg, plc) -> {
if (msg instanceof LatchAckMessage)
- processAck(nodeId, (LatchAckMessage) msg);
+ processAck(nodeId, (LatchAckMessage)msg);
});
// First coordinator initialization.
@@ -144,8 +147,7 @@ public class ExchangeLatchManager {
}
/**
- * Creates server latch with given {@code id} and {@code topVer}.
- * Adds corresponding pending acks to it.
+ * Creates server latch with given {@code id} and {@code topVer}. Adds corresponding pending acks to it.
*
* @param latchUid Latch uid.
* @param participants Participant nodes.
@@ -175,15 +177,16 @@ public class ExchangeLatchManager {
}
/**
- * Creates client latch.
- * If there is final ack corresponds to given {@code id} and {@code topVer}, latch will be completed immediately.
+ * Creates client latch. If there is final ack corresponds to given {@code id} and {@code topVer}, latch will be
+ * completed immediately.
*
* @param latchUid Latch uid.
* @param coordinator Coordinator node.
* @param participants Participant nodes.
* @return Client latch instance.
*/
- private Latch createClientLatch(CompletableLatchUid latchUid, ClusterNode coordinator, Collection<ClusterNode> participants) {
+ private Latch createClientLatch(CompletableLatchUid latchUid, ClusterNode coordinator,
+ Collection<ClusterNode> participants) {
assert !serverLatches.containsKey(latchUid);
assert !clientLatches.containsKey(latchUid);
@@ -202,8 +205,8 @@ public class ExchangeLatchManager {
/**
* Creates new latch with specified {@code id} and {@code topVer} or returns existing latch.
*
- * Participants of latch are calculated from given {@code topVer} as alive server nodes.
- * If local node is coordinator {@code ServerLatch} instance will be created, otherwise {@code ClientLatch} instance.
+ * Participants of latch are calculated from given {@code topVer} as alive server nodes. If local node is
+ * coordinator {@code ServerLatch} instance will be created, otherwise {@code ClientLatch} instance.
*
* @param id Latch id.
* @param topVer Latch topology version.
@@ -281,11 +284,11 @@ public class ExchangeLatchManager {
if (histNodes != null)
return histNodes.stream().filter(n -> !n.isClient() && !n.isDaemon() && discovery.alive(n))
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
else
throw new IgniteException("Topology " + topVer + " not found in discovery history "
- + "; consider increasing IGNITE_DISCOVERY_HISTORY_SIZE property. Current value is "
- + IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE, -1));
+ + "; consider increasing IGNITE_DISCOVERY_HISTORY_SIZE property. Current value is "
+ + IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE, -1));
}
}
@@ -297,9 +300,9 @@ public class ExchangeLatchManager {
Collection<ClusterNode> aliveNodes = aliveNodesForTopologyVer(topVer);
List<ClusterNode> participantNodes = aliveNodes
- .stream()
- .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
- .collect(Collectors.toList());
+ .stream()
+ .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
+ .collect(Collectors.toList());
if (canSkipJoiningNodes(topVer))
return excludeJoinedNodes(participantNodes, topVer);
@@ -417,8 +420,7 @@ public class ExchangeLatchManager {
}
/**
- * Changes coordinator to current local node.
- * Restores all server latches from pending acks and own client latches.
+ * Changes coordinator to current local node. Restores all server latches from pending acks and own client latches.
*/
private void becomeNewCoordinator() {
if (log.isInfoEnabled())
@@ -596,8 +598,7 @@ public class ExchangeLatchManager {
}
/**
- * Receives ack from given node.
- * Count downs latch if ack was not already processed.
+ * Receives ack from given node. Count downs latch if ack was not already processed.
*
* @param from Node.
*/
@@ -609,8 +610,7 @@ public class ExchangeLatchManager {
}
/**
- * Count down latch from ack of given node.
- * Completes latch if all acks are received.
+ * Count down latch from ack of given node. Completes latch if all acks are received.
*
* @param node Node.
*/
@@ -639,14 +639,13 @@ public class ExchangeLatchManager {
Set<UUID> pendingAcks = participants.stream().filter(ack -> !acks.contains(ack)).collect(Collectors.toSet());
return S.toString(ServerLatch.class, this,
- "pendingAcks", pendingAcks,
- "super", super.toString());
+ "pendingAcks", pendingAcks,
+ "super", super.toString());
}
}
/**
- * Latch creating on non-coordinator node.
- * Latch completes when final ack from coordinator is received.
+ * Latch creating on non-coordinator node. Latch completes when final ack from coordinator is received.
*/
class ClientLatch extends CompletableLatch {
/** Latch coordinator node. Can be changed if coordinator is left from topology. */
@@ -698,8 +697,8 @@ public class ExchangeLatchManager {
}
/**
- * Sends ack to coordinator node.
- * There is ack deduplication on coordinator. So it's fine to send same ack twice.
+ * Sends ack to coordinator node. There is ack deduplication on coordinator. So it's fine to send same ack
+ * twice.
*/
private void sendAck() {
ackSent = true;
@@ -721,7 +720,7 @@ public class ExchangeLatchManager {
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ClientLatch.class, this,
- "super", super.toString());
+ "super", super.toString());
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 0298d31..1bae941 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -375,7 +375,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
- // If this is the oldest node.
+ // If this is the oldest node (coordinator) or cache was added during this exchange
if (oldest.id().equals(loc.id()) || exchFut.dynamicCacheGroupStarted(grpId)) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index 7b9c3c3..171ac33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -42,7 +42,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.jetbrains.annotations.Nullable;
/**
- * DHT partition topology.
+ * Distributed Hash Table (DHT) partition topology.
*/
@GridToStringExclude
public interface GridDhtPartitionTopology {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 026a54f..a17321f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -109,7 +109,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** */
private final AtomicReferenceArray<GridDhtLocalPartition> locParts;
- /** Node to partition map. */
+ /** Node to partition map from all nodes. */
private GridDhtPartitionFullMap node2part;
/** */
@@ -632,7 +632,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
ClusterNode oldest = discoCache.oldestAliveServerNode();
- // If this is the oldest node.
+ // If this is the oldest node (coordinator) or cache was added during this exchange
if (oldest != null && (ctx.localNode().equals(oldest) || grpStarted)) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 086247c..a59514e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -64,7 +64,7 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
- * Common logic for near caches.
+ * Common logic for near caches (smaller local cache that stores most recently or most frequently accessed data).
*/
public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAdapter<K, V> {
/** */